Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an enhanced dynamic subagent management system, allowing the main agent to create, manage, and communicate with specialized subagents. Key additions include a DynamicSubAgentManager for lifecycle and shared context handling, a dedicated SubAgentLogger, and integration into the main agent's toolset and prompt construction. Feedback focuses on improving error handling by avoiding broad exception silences, ensuring safe string formatting for system prompts, and refining the logic for detecting subagent creation failures. A minor typo in the subagent capability prompt was also identified.
|
主要实现已经完成,还需要更多测试 |
There was a problem hiding this comment.
Hey - I've found 6 issues, and left some high level feedback:
- In
DynamicSubAgentManager.update_subagent_history, you mutate thecurrent_messageslist (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration. _do_handoff_backgroundusesDynamicSubAgentManagerwithout importing it in that scope, which will raise aNameErrorat runtime; add the appropriate import (similar to_execute_handoff) before calling its methods.- Several places access
session.subagent_status[agent_name]directly (e.g., inget_subagent_status,create_pending_subagent_task, andWaitForSubagentTool), which can throwKeyErrorif the status was never set; consider using.get()with a sensible default or checking membership before indexing.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `DynamicSubAgentManager.update_subagent_history`, you mutate the `current_messages` list (removing system messages) while iterating it, which can skip elements; consider building a new filtered list instead of modifying in-place during iteration.
- `_do_handoff_background` uses `DynamicSubAgentManager` without importing it in that scope, which will raise a `NameError` at runtime; add the appropriate import (similar to `_execute_handoff`) before calling its methods.
- Several places access `session.subagent_status[agent_name]` directly (e.g., in `get_subagent_status`, `create_pending_subagent_task`, and `WaitForSubagentTool`), which can throw `KeyError` if the status was never set; consider using `.get()` with a sensible default or checking membership before indexing.
## Individual Comments
### Comment 1
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="269-275" />
<code_context>
+ if agent_name not in session.subagent_histories:
+ session.subagent_histories[agent_name] = []
+
+ if isinstance(current_messages, list):
+ _MAX_TOOL_RESULT_LEN = 2000
+ for msg in current_messages:
+ if (
+ isinstance(msg, dict) and msg.get("role") == "system"
+ ): # 移除system消息
+ current_messages.remove(msg)
+ # 对过长的 tool 结果做截断,避免单条消息占用过多空间
+ if (
</code_context>
<issue_to_address>
**issue (bug_risk):** Avoid mutating `current_messages` while iterating over it when cleaning system/tool messages.
In `update_subagent_history`, the loop both iterates over and mutates `current_messages` via `current_messages.remove(msg)`, which can cause items to be skipped and unintentionally alters the caller’s list (e.g. `runner_messages`). Instead, build a separate `cleaned_msgs` list and append only non-system messages, truncating long tool messages as needed, then extend `session.subagent_histories[agent_name]` with `cleaned_msgs` rather than the original list.
</issue_to_address>
### Comment 2
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="1544-1551" />
<code_context>
+
+ if status == "IDLE":
+ return f"Error: SubAgent '{subagent_name}' is running no tasks."
+ elif status == "COMPLETED":
+ result = DynamicSubAgentManager.get_subagent_result(
+ session_id, subagent_name, task_id
+ )
+ if result and (result.result != "" or result.completed_at > 0):
+ return f"SubAgent '{result.agent_name}' execution completed\n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n--- Result ---\n{result.result}\n"
+ else:
+ return f"SubAgent '{result.agent_name}' execution completed with empty results. \n Task id: {result.task_id}\n Execution time: {result.execution_time:.1f}s\n"
+ elif status == "FAILED":
+ result = DynamicSubAgentManager.get_subagent_result(
</code_context>
<issue_to_address>
**issue (bug_risk):** Guard against `result` being `None` in `WaitForSubagentTool` before dereferencing.
In the `COMPLETED` (and similarly `FAILED`) branch, `result` may be `None`. In that case the `if result and ...` condition is false and the `else` dereferences `result.agent_name`, causing an exception. Handle the `None` case first, e.g.:
```python
result = DynamicSubAgentManager.get_subagent_result(...)
if not result:
return (
f"Error: No result found for SubAgent '{subagent_name}' "
f"task {task_id or '(latest)'} despite status {status}."
)
if result.result != "" or result.completed_at > 0:
...
else:
...
```
</issue_to_address>
### Comment 3
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="503-507" />
<code_context>
async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
- task_id=task_id,
+ task_id=original_task_id,
+ subagent_task_id=subagent_task_id,
**tool_args,
)
</code_context>
<issue_to_address>
**question (bug_risk):** Passing `subagent_task_id` through `**tool_args` may leak an unexpected parameter into the subagent tool call.
`subagent_task_id` is added to `tool_args` in `_execute_handoff_background`, then `_do_handoff_background` passes `**tool_args` to `_execute_handoff`, so it becomes an argument to the underlying handoff tool. If that tool’s schema doesn’t declare `subagent_task_id`, this may cause validation failures or be silently dropped. If the subagent doesn’t need this value, pop it from `tool_args` and use it only for bookkeeping; if it does, ensure it’s documented and included in the tool schema.
</issue_to_address>
### Comment 4
<location path="astrbot/core/dynamic_subagent_manager.py" line_range="776" />
<code_context>
+ return list(session.handoff_tools.values())
+
+ @classmethod
+ def create_pending_subagent_task(cls, session_id: str, agent_name: str) -> str:
+ """为 SubAgent 创建一个 pending 任务,返回 task_id
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing small internal helpers to centralize background task result handling and status access, replacing the current scattered direct dict manipulations with reusable abstractions.
A lot of the added complexity comes from repeated, ad‑hoc manipulation of `session.subagent_background_results` and `session.subagent_status` across multiple methods. You can significantly reduce cognitive load with a small, local abstraction without changing behavior.
### 1. Centralize background task result logic
These methods currently reimplement similar logic over `session.subagent_background_results`:
- `create_pending_subagent_task`
- `get_pending_subagent_tasks`
- `get_latest_task_id`
- `store_subagent_result`
- `get_subagent_result`
- `has_subagent_result`
- `clear_subagent_result`
You can introduce a very small internal helper to encapsulate result access and “completed” checks, and then reuse it. For example:
```python
# inside DynamicSubAgentManager
@classmethod
def _ensure_task_store(cls, session: DynamicSubAgentSession, agent_name: str) -> dict[str, SubAgentExecutionResult]:
if agent_name not in session.subagent_background_results:
session.subagent_background_results[agent_name] = {}
return session.subagent_background_results[agent_name]
@staticmethod
def _is_task_completed(result: SubAgentExecutionResult) -> bool:
return (result.result != "" and result.result is not None) or result.completed_at > 0
```
Then, for example, `get_pending_subagent_tasks` and `get_subagent_result` become much simpler and more uniform:
```python
@classmethod
def get_pending_subagent_tasks(cls, session_id: str, agent_name: str) -> list[str]:
session = cls.get_session(session_id)
if not session:
return []
store = session.subagent_background_results.get(agent_name)
if not store:
return []
pending = [tid for tid, res in store.items() if not cls._is_task_completed(res)]
return sorted(pending, key=lambda tid: store[tid].created_at)
```
```python
@classmethod
def get_subagent_result(
cls, session_id: str, agent_name: str, task_id: str | None = None
) -> SubAgentExecutionResult | None:
session = cls.get_session(session_id)
if not session:
return None
store = session.subagent_background_results.get(agent_name)
if not store:
return None
if task_id is None:
completed = [res for res in store.values() if cls._is_task_completed(res)]
if not completed:
return None
return max(completed, key=lambda r: r.created_at)
return store.get(task_id)
```
`store_subagent_result`, `has_subagent_result`, and `clear_subagent_result` can likewise be rewritten to go through `_ensure_task_store` and `_is_task_completed`. This keeps all functionality but removes duplicated filter/sorting logic and makes status/result flows easier to reason about.
### 2. Make subagent status access safe and consistent
Currently `get_subagent_status` assumes the key exists:
```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
session = cls.get_session(session_id)
return session.subagent_status[agent_name]
```
A small helper avoids scattered `KeyError` assumptions and defensive checks elsewhere:
```python
@classmethod
def get_subagent_status(cls, session_id: str, agent_name: str) -> str:
session = cls.get_session(session_id)
if not session:
return "UNKNOWN"
return session.subagent_status.get(agent_name, "UNKNOWN")
```
Then consumers like `ListDynamicSubagentsTool.call` and `WaitForSubagentTool.call` can rely on this instead of touching the dict shape implicitly. This reduces cross‑coupling on internal dict structure and status keys while preserving all existing behavior.
</issue_to_address>
### Comment 5
<location path="astrbot/core/astr_agent_tool_exec.py" line_range="25" />
<code_context>
BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT,
)
from astrbot.core.cron.events import CronMessageEvent
+from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
from astrbot.core.message.components import Image
from astrbot.core.message.message_event_result import (
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting DynamicSubAgentManager-related logic and subagent orchestration into small helper functions to reduce duplication and make the main methods easier to read and maintain.
You can keep the new functionality but reduce the complexity and repetition with a few small helpers and a single DynamicSubAgentManager access point.
### 1. Centralize `DynamicSubAgentManager` access
You currently `from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager` multiple times and mix that with other symbols (e.g. `SEND_SHARED_CONTEXT_TOOL`).
Pull this into a module-level helper so the core methods aren’t cluttered with dynamic imports and try/except:
```python
# near the top of the module
try:
from astrbot.core import dynamic_subagent_manager as _dsm_mod
except Exception: # noqa: BLE001
_dsm_mod = None
def _get_dynamic_subagent_manager():
return getattr(_dsm_mod, "DynamicSubAgentManager", None)
def _get_send_shared_context_tool():
return getattr(_dsm_mod, "SEND_SHARED_CONTEXT_TOOL", None)
```
Then in the methods:
```python
toolset = cls._build_handoff_toolset(run_context, tool.agent.tools)
dsm = _get_dynamic_subagent_manager()
send_shared_context_tool = _get_send_shared_context_tool()
session_id = event.unified_msg_origin
if dsm and send_shared_context_tool:
session = dsm.get_session(session_id)
if session and session.shared_context_enabled:
toolset.add_tool(send_shared_context_tool)
```
This removes repetitive imports and exception handling from the hot paths.
---
### 2. Extract subagent history loading logic
The inlined history loading in `_execute_handoff` is long and includes type checks and try/excepts. Extract it:
```python
def _load_subagent_history(umo: str, agent_name: str | None) -> list[Message]:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return []
history: list[Message] = []
try:
stored_history = dsm.get_subagent_history(umo, agent_name)
if not stored_history:
return []
for hist_msg in stored_history:
try:
if isinstance(hist_msg, Message):
history.append(hist_msg)
elif isinstance(hist_msg, dict):
history.append(Message.model_validate(hist_msg))
except Exception:
continue
if history:
logger.debug(
"[SubAgentHistory] Loaded %d history messages for %s",
len(history),
agent_name,
)
except Exception as e: # noqa: BLE001
logger.warning(
"[SubAgentHistory] Failed to load history for %s: %s",
agent_name,
e,
)
return history
```
Usage in `_execute_handoff`:
```python
agent_name = getattr(tool.agent, "name", None)
subagent_history = _load_subagent_history(umo, agent_name)
if subagent_history:
contexts = (subagent_history + (contexts or [])) if contexts else subagent_history
```
This keeps `_execute_handoff` focused on orchestration rather than the details of history storage.
---
### 3. Extract subagent system prompt construction
You build `subagent_system_prompt` inline and re-import `DynamicSubAgentManager` again. Move this into a helper:
```python
def _build_subagent_system_prompt(
tool: HandoffTool,
umo: str,
prov_settings: dict,
) -> str:
agent_name = getattr(tool.agent, "name", None)
base = tool.agent.instructions or ""
system_prompt = f"# Role\nYour name is {agent_name}(used for tool calling)\n{base}\n"
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return system_prompt
try:
runtime = prov_settings.get("computer_use_runtime", "local")
static_prompt = dsm.build_static_subagent_prompts(umo, agent_name)
dynamic_prompt = dsm.build_dynamic_subagent_prompts(umo, agent_name, runtime)
system_prompt += static_prompt
system_prompt += dynamic_prompt
except Exception:
pass
return system_prompt
```
Then in `_execute_handoff`:
```python
prov_settings: dict = ctx.get_config(umo=umo).get("provider_settings", {})
subagent_system_prompt = _build_subagent_system_prompt(tool, umo, prov_settings)
runner_messages: list[Message] = []
llm_resp = await ctx.tool_loop_agent(
event=event,
chat_provider_id=prov_id,
prompt=input_,
image_urls=image_urls,
system_prompt=subagent_system_prompt,
tools=toolset,
contexts=contexts,
max_steps=agent_max_step,
tool_call_timeout=run_context.tool_call_timeout,
stream=stream,
runner_messages=runner_messages,
)
```
History persistence can also be extracted:
```python
def _persist_subagent_history(umo: str, agent_name: str | None, runner_messages: list[Message]) -> None:
if not (agent_name and runner_messages):
return
dsm = _get_dynamic_subagent_manager()
if not dsm:
return
try:
dsm.update_subagent_history(umo, agent_name, runner_messages)
except Exception:
pass
```
And called after `tool_loop_agent`.
---
### 4. Extract enhanced background-task registration and user message
The enhanced branch in `_execute_handoff_background` mixes session lookup, task creation, status, and user text. Isolate it:
```python
def _register_enhanced_subagent_task(
umo: str,
agent_name: str | None,
) -> str | None:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return None
try:
session = dsm.get_session(umo)
if not (session and agent_name in session.subagents):
return None
subagent_task_id = dsm.create_pending_subagent_task(
session_id=umo, agent_name=agent_name
)
if subagent_task_id.startswith("__PENDING_TASK_CREATE_FAILED__"):
logger.info(
"[EnhancedSubAgent:BackgroundTask] Failed to create background task %s for %s",
subagent_task_id,
agent_name,
)
return None
dsm.set_subagent_status(
session_id=umo, agent_name=agent_name, status="RUNNING"
)
logger.info(
"[EnhancedSubAgent:BackgroundTask] Created background task %s for %s",
subagent_task_id,
agent_name,
)
return subagent_task_id
except Exception as e: # noqa: BLE001
logger.info(
"[EnhancedSubAgent:BackgroundTask] Failed to create background task for %s: %s",
agent_name,
e,
)
return None
```
```python
def _build_background_submission_message(
agent_name: str | None,
original_task_id: str,
subagent_task_id: str | None,
) -> mcp.types.TextContent:
if subagent_task_id:
text = (
f"Background task submitted. subagent_task_id={subagent_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
f"Use wait_for_subagent(subagent_name='{agent_name}', task_id='{subagent_task_id}') to get the result."
)
else:
text = (
f"Background task submitted. task_id={original_task_id}. "
f"SubAgent '{agent_name}' is working on the task. "
"You will be notified when it finishes."
)
return mcp.types.TextContent(type="text", text=text)
```
Then `_execute_handoff_background` reduces to:
```python
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)
subagent_task_id = _register_enhanced_subagent_task(umo, agent_name)
original_task_id = uuid.uuid4().hex
async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
task_id=original_task_id,
subagent_task_id=subagent_task_id,
**tool_args,
)
except Exception as e: # noqa: BLE001
logger.error(
"Background handoff %s (%s) failed: %s",
original_task_id,
tool.name,
e,
exc_info=True,
)
asyncio.create_task(_run_handoff_in_background())
text_content = _build_background_submission_message(
agent_name, original_task_id, subagent_task_id
)
yield mcp.types.CallToolResult(content=[text_content])
```
---
### 5. Split enhanced result handling in `_do_handoff_background`
The method currently mixes execution, enhanced result storage, status updates, shared context, and main-agent wakeup logic.
You can keep `_do_handoff_background` as an orchestrator and push the detailed branching into helpers:
```python
def _is_enhanced_subagent(umo: str, agent_name: str | None) -> tuple[bool, T.Any]:
dsm = _get_dynamic_subagent_manager()
if not (dsm and agent_name):
return False, None
session = dsm.get_session(umo)
if session and agent_name in session.subagents:
return True, session
return False, session
```
```python
async def _handle_enhanced_subagent_background_result(
cls,
*,
umo: str,
agent_name: str,
task_id: str | None,
result_text: str,
error_text: str | None,
execution_time: float,
run_context: ContextWrapper[AstrAgentContext],
tool: HandoffTool,
tool_args: dict,
) -> None:
dsm = _get_dynamic_subagent_manager()
if not dsm:
return
success = error_text is None
dsm.store_subagent_result(
session_id=umo,
agent_name=agent_name,
task_id=task_id,
success=success,
result=result_text.strip() if result_text else "",
error=error_text,
execution_time=execution_time,
)
dsm.set_subagent_status(
session_id=umo,
agent_name=agent_name,
status="FAILED" if error_text else "COMPLETED",
)
session = dsm.get_session(umo)
if session and session.shared_context_enabled:
status_content = (
f"[EnhancedSubAgent:BackgroundTask] SubAgent '{agent_name}' Task '{task_id}' "
f"{'Failed: ' + error_text if error_text else f'Complete. Execution Time: {execution_time:.1f}s'}"
)
dsm.add_shared_context(
session_id=umo,
sender=agent_name,
context_type="status",
content=status_content,
target="all",
)
logger.info(
"[EnhancedSubAgent:BackgroundTask] Stored result for %s task %s: success=%s, time=%.1fs",
agent_name,
task_id,
success,
execution_time,
)
if not await cls._maybe_wake_main_agent_after_background(
run_context=run_context,
tool=tool,
task_id=task_id,
agent_name=agent_name,
result_text=result_text,
tool_args=tool_args,
):
return
```
```python
@classmethod
async def _maybe_wake_main_agent_after_background(
cls,
*,
run_context: ContextWrapper[AstrAgentContext],
tool: HandoffTool,
task_id: str,
agent_name: str | None,
result_text: str,
tool_args: dict,
) -> bool:
event = run_context.context.event
try:
context_extra = getattr(run_context.context, "extra", None)
if context_extra and isinstance(context_extra, dict):
main_agent_runner = context_extra.get("main_agent_runner")
main_agent_is_running = (
main_agent_runner is not None and not main_agent_runner.done()
)
else:
main_agent_is_running = False
except Exception as e: # noqa: BLE001
logger.error("Failed to check main agent status: %s", e)
main_agent_is_running = True
if main_agent_is_running:
return False
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={"subagent_name": agent_name},
)
return True
```
Then `_do_handoff_background` can collapse to:
```python
@classmethod
async def _do_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
start_time = time.time()
result_text = ""
error_text: str | None = None
tool_args = dict(tool_args)
tool_args["image_urls"] = await cls._collect_handoff_image_urls(
run_context, tool_args.get("image_urls")
)
event = run_context.context.event
umo = event.unified_msg_origin
agent_name = getattr(tool.agent, "name", None)
try:
async for r in cls._execute_handoff(
tool,
run_context,
image_urls_prepared=True,
**tool_args,
):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e: # noqa: BLE001
error_text = str(e)
result_text = (
f"error: Background task execution failed, internal error: {e!s}"
)
execution_time = time.time() - start_time
is_enhanced, _ = _is_enhanced_subagent(umo, agent_name)
if is_enhanced:
await _handle_enhanced_subagent_background_result(
cls=cls,
umo=umo,
agent_name=agent_name,
task_id=tool_args.get("subagent_task_id"),
result_text=result_text,
error_text=error_text,
execution_time=execution_time,
run_context=run_context,
tool=tool,
tool_args=tool_args,
)
else:
await cls._wake_main_agent_for_background_result(
run_context=run_context,
task_id=task_id,
tool_name=tool.name,
result_text=result_text,
tool_args=tool_args,
note=(
event.get_extra("background_note")
or f"Background task for subagent '{agent_name}' finished."
),
summary_name=f"Dedicated to subagent `{agent_name}`",
extra_result_fields={"subagent_name": agent_name},
)
```
This keeps the orchestration logic in the high-level methods and moves the deeply branched behavior into focused helpers, preserving your enhanced behavior while making each method easier to scan and maintain.
</issue_to_address>
### Comment 6
<location path="astrbot/core/agent/runners/tool_loop_agent_runner.py" line_range="920" />
<code_context>
- func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
- else:
- func_tool = req.func_tool.get_tool(func_tool_name)
+ # First check if it's a dynamically created subagent tool
+ func_tool = None
+ run_context_context = getattr(self.run_context, "context", None)
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the dynamic tool resolution and dynamic tool registration logic into dedicated helper methods so `_handle_tool_call` remains linear and easier to read.
You can keep the new behavior but pull the dynamic concerns into helpers to reduce branching and duplication in `_handle_tool_call`.
### 1. Extract dynamic tool resolution into a helper
Move the dynamic lookup logic (including the `DynamicSubAgentManager` import and session handling) into a small helper. That keeps `_handle_tool_call` linear and easier to follow:
```python
def _resolve_dynamic_tool(self, func_tool_name: str):
run_context_context = getattr(self.run_context, "context", None)
if run_context_context is None:
return None
event = getattr(run_context_context, "event", None)
if event is None:
return None
session_id = getattr(event, "unified_msg_origin", None)
if not session_id:
return None
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
dynamic_handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
except Exception:
return None
for h in dynamic_handoffs:
if h.name == func_tool_name or f"transfer_to_{h.name}" == func_tool_name:
return h
return None
```
Then the main code becomes simpler:
```python
if not req.func_tool:
return
# Prefer dynamic tools when available
func_tool = self._resolve_dynamic_tool(func_tool_name)
if func_tool is None:
if self.tool_schema_mode == "skills_like" and self._skill_like_raw_tool_set:
func_tool = self._skill_like_raw_tool_set.get_tool(func_tool_name)
else:
func_tool = req.func_tool.get_tool(func_tool_name)
```
This keeps the hot path readable while preserving the dynamic behavior.
### 2. Extract dynamic-tool creation handling into a helper
Similarly, encapsulate the `__DYNAMIC_TOOL_CREATED__` protocol and registration into a helper that takes the tool result and updates `self.req.func_tool` as needed:
```python
def _maybe_register_dynamic_tool_from_result(self, result_content: str) -> None:
if not result_content.startswith("__DYNAMIC_TOOL_CREATED__:"):
return
parts = result_content.split(":", 3)
if len(parts) < 4:
return
new_tool_name = parts[1]
new_tool_obj_name = parts[2]
logger.info(f"[EnhancedSubAgent] Tool created: {new_tool_name}")
run_context_context = getattr(self.run_context, "context", None)
event = getattr(run_context_context, "event", None) if run_context_context else None
session_id = getattr(event, "unified_msg_origin", None) if event else None
if not session_id:
return
try:
from astrbot.core.dynamic_subagent_manager import DynamicSubAgentManager
handoffs = DynamicSubAgentManager.get_handoff_tools_for_session(session_id)
except Exception as e:
logger.warning(f"[EnhancedSubAgent] Failed to load dynamic handoffs: {e}")
return
for handoff in handoffs:
if (
handoff.name == new_tool_obj_name
or handoff.name == new_tool_name.replace("transfer_to_", "")
):
if self.req.func_tool:
self.req.func_tool.add_tool(handoff)
logger.info(f"[EnhancedSubAgent] Added {handoff.name} to func_tool set")
break
```
Then the loop becomes:
```python
if result_parts:
result_content = "\n\n".join(result_parts)
self._maybe_register_dynamic_tool_from_result(result_content)
inline_result = await self._materialize_large_tool_result(
tool_call_id=func_tool_id,
content=result_content,
)
_append_tool_call_result(
func_tool_id,
inline_result + self._build_repeated_tool_call_guidance(
func_tool_name, tool_call_streak
),
)
```
This removes protocol parsing, logging details, and manager lookups from the main loop and makes `_handle_tool_call` focus on “run tool → process result” while still fully supporting dynamic tools.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Modifications / 改动点
一、概述
增强版SubAgent是AstrBot原版SubAgent系统的扩展,提供了动态创建、后台任务模式、上下文记忆、工作目录隔离、技能隔离、公共上下文共享等高级功能。
核心特性
二、新增配置说明
2.1 配置项说明
在
cmd_config.json中通过enhanced_subagent配置块进行设置:{ "subagent_orchestrator": { "main_enable": false, "agents": [] }, "enhanced_subagent": { "enabled": true, "max_subagent_count": 3, "auto_cleanup_per_turn": true, "shared_context_enabled": true, "shared_context_maxlen": 200, "max_subagent_history": 500 } }配置独立性:
enhanced_subagent与subagent_orchestrator可同时使用,相互独立。主Agent使用subagent_orchestrator的transfer_to_xxx工具时,执行原版逻辑;若委派动态生成的transfer_to_xxx工具,执行增强的SubAgent逻辑。2.2 配置项详解
enabledfalsemax_subagent_count3auto_cleanup_per_turntrueshared_context_enabledtrueshared_context_maxlen200max_subagent_history500三、核心新增文件
3.1 dynamic_subagent_manager.py
文件路径:
astrbot/core/dynamic_subagent_manager.py核心数据结构:
核心类:
DynamicSubAgentManagerconfigure()create_subagent()remove_subagent()cleanup_session_turn_end()protect_subagent()is_protected()update_subagent_history()get_subagent_history()clear_subagent_history()build_static_subagent_prompts()build_dynamic_subagent_prompts()build_task_router_prompt()get_subagent_tools()register_blacklisted_tool()register_inherent_tool()add_shared_context()get_shared_context()_build_shared_context_prompt()create_pending_subagent_task()get_pending_subagent_tasks()get_latest_task_id()store_subagent_result()get_subagent_result()has_subagent_result()clear_subagent_result()get_subagent_status()get_all_subagent_status()cleanup_shared_context_by_agent()clear_shared_context()set_shared_context_enabled()get_handoff_tools_for_session()cleanup_session()3.2
subagent_logger.py(已移除)旧版中的
subagent_logger.py已被移除,日志功能统一使用astrbot.core.logger,通过[EnhancedSubAgent:xxx]前缀进行区分。四、功能实现
4.1 动态创建子代理
功能描述
主Agent可以通过工具调用动态创建子代理,每个子代理拥有独立的人设、工具、技能和工作目录配置。
实现方式
astrbot/core/dynamic_subagent_manager.pyCreateDynamicSubAgentToolname: 子代理名称system_prompt: 子代理的人设和系统提示tools: 可用工具列表(字符串名称)skills: 可用技能列表(字符串名称)workdir: 子代理工作目录(绝对路径,可选)使用示例
名称验证规则
^[a-zA-Z][a-zA-Z0-9_]{0,31}$)工作目录安全检查
CreateDynamicSubAgentTool._check_path_safety()对传入的workdir进行安全验证:..windows,system32,syswow64,boot等/etc,/bin,/sbin,/root等/System,/Library,/private/var,/usr等验证失败时,工作目录回退到
get_astrbot_temp_path()。创建后流程
astrbot_execute_shell,astrbot_execute_python以及公共上下文启用时的send_shared_context)DynamicSubAgentConfig配置对象Agent和HandoffTool对象DynamicSubAgentManager__DYNAMIC_TOOL_CREATED__标记的消息,触发工具schema刷新工具黑名单与固有工具
4.2 子代理委派 (transfer_to_xxx)
功能描述
创建子代理后,主Agent使用
transfer_to_{name}工具将任务委派给对应子代理。实现方式
astrbot/core/astr_agent_tool_exec.py_execute_handoff()/_execute_handoff_background()子代理System Prompt构建
将子代理的system_prompt构建分为静态部分和动态部分:
静态提示词
build_static_subagent_prompts():动态提示词
build_dynamic_subagent_prompts():委派执行流程
transfer_to_xxx工具ToolLoopAgentRunner._handle_function_tools()在DynamicSubAgentManager中查找动态创建的handoff工具HandoffTool对象FunctionToolExecutor._execute_handoff()执行委派send_shared_context工具到子代理工具集provider_id使用不同的LLM提供商ctx.tool_loop_agent()执行子代理循环历史上下文注入流程
subagent_histories获取历史消息Message对象begin_dialogs之前runner_messages(Agent运行期间的所有消息)追加到历史max_subagent_history(默认500条),超出时保留最新的4.3 后台任务等待
功能描述
当主Agent认为某个任务耗时很长时,可以让SubAgent以后台模式运行,主Agent不会被阻塞,可以继续执行其他可并行的任务。
新版增强了后台任务的执行流程:判断主Agent是否仍在运行,仅在主Agent已结束时通过
_wake_main_agent_for_background_result通知用户;若主Agent仍在运行,主Agent可通过wait_for_subagent以阻塞等待方式主动获取结果。实现方式
astrbot/core/astr_agent_tool_exec.py_execute_handoff_background()、_do_handoff_background()执行流程
主Agent状态检测
后台任务完成后,需要决定是否通过
_wake_main_agent_for_background_result通知用户。关键逻辑:这依赖
AstrAgentContext.extra字段(类型修改为dict[str, Any]),在build_main_agent()中注入:主动等待工具
WaitForSubagentTool 轮询逻辑
task_id,获取最早的 pending 任务(先进先出)IDLE→ 返回错误:子代理未在运行任务COMPLETED→ 返回执行结果FAILED→ 返回失败信息RUNNING→ 继续等待优势
注意事项
4.4 子代理历史记忆
功能描述
受保护的子代理可以保留跨轮对话的历史上下文,实现连续对话能力。
实现方式
DynamicSubAgentSession.subagent_historiesmax_subagent_history(默认500条)历史管理机制
runner_messages(Agent运行期间的消息)追加到历史role=system的消息_MAX_TOOL_RESULT_LEN(2000字符)的tool结果截断,附加...[truncated]max_subagent_history时,保留最新的消息Message对象,合并到contexts历史清理
reset_subagent工具可清除指定子代理的历史4.5 工作目录隔离(软约束)
功能描述
每个子代理可配置独立的工作目录,未指定时默认使用
get_astrbot_temp_path()。实现方式
DynamicSubAgentConfig.workdirCreateDynamicSubAgentTool._check_path_safety()_build_workdir_prompt()注入的工作目录提示
4.6 行为规范注入
功能描述
子代理自动注入安全模式、输出规范和时间信息等行为约束。
注入内容
静态行为规范
_build_rule_prompt():动态时间信息
_build_time_prompt():# Current Time 2026-04-12 00:22 (CST)4.7 Skills隔离
功能描述
每个子代理可分配不同的Skills,相互隔离,不共享技能配置。
注入逻辑
DynamicSubAgentConfig.skills)SkillManager.list_skills()获取所有可用技能build_skills_prompt()生成提示词build_dynamic_subagent_prompts()注入到子代理的system_prompt4.8 公共上下文共享
功能描述
当
shared_context_enabled=true时,维护一个所有子代理共享的、实时更新的上下文区域。在SubAgent每次调用LLM之前,公共上下文的内容会被注入到System Prompt中。公共上下文中每条信息的格式
上下文类型
statusmessagesystem公共上下文注入方式
按类型和优先级分组注入到子代理的 System Prompt 中,让Agent可以更清晰地获取共享上下文的信息,并知道哪些需要优先处理。
实现方式
astrbot/core/dynamic_subagent_manager.pyDynamicSubAgentManager._build_shared_context_prompt()Prompt结构
容量管理
shared_context_maxlen时,保留最近90%的消息优势
4.9 主Agent路由提示
功能描述
在主Agent的System Prompt中注入动态SubAgent能力说明,包括配额信息、创建指南和生命周期说明。
实现方式
DynamicSubAgentManager.build_task_router_prompt()注入内容
配额耗尽时:仍然显示代理能力说明和生命周期,但告知无法创建新子代理,仍可委派已有的子代理。
4.10 自动清理
功能描述
每轮对话结束后,自动清理非保护的子代理。
清理规则
remove_subagent()进行清理(移除配置、handoff工具、历史、后台结果)触发时机
在
internal.py的 agent 结束流程中:4.11 数量限制
功能描述
限制单个会话中子代理的最大数量,防止资源耗尽。
替换已存在的同名子代理不增加计数
五、新增的工具列表
5.1 主Agent可用工具
create_dynamic_subagentremove_dynamic_subagentlist_dynamic_subagentsreset_subagentprotect_subagentunprotect_subagentview_shared_contextsend_shared_context_for_main_agentwait_for_subagent5.2 子代理可用工具
send_shared_contextview_shared_context工具,因为公共上下文会自动注入到子代理六、修改文件清单
astrbot/core/dynamic_subagent_manager.pyastrbot/core/astr_agent_tool_exec.py_execute_handoff()(静态/动态Prompt构建、历史注入)、增强_execute_handoff_background()(pending任务创建)、新增_do_handoff_background()(结果存储与智能唤醒逻辑)astrbot/core/astr_main_agent.py_apply_enhanced_subagent_tools()函数,在MainAgentBuildConfig中新增enhanced_subagent配置字段,在build_main_agent()中注入main_agent_runner到AstrAgentContext.extraastrbot/core/astr_agent_context.pyextra字段类型由dict[str, str]改为dict[str, Any]astrbot/core/config/default.pyenhanced_subagent配置块astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.pyastrbot/core/agent/runners/tool_loop_agent_runner.py_handle_function_tools()中集成动态SubAgent工具查找和注册astrbot/core/star/context.pytool_loop_agent()可以返回全部的上下文(通过kwargs.get("runner_messages")判断,用于记录SubAgent历史七、使用流程
7.1 基本使用流程
7.2 带保护的多轮对话流程
7.3 带公共上下文的协作流程
7.4 后台任务并行处理流程
7.5 后台任务与共享上下文结合
八、API 参考
8.1 DynamicSubAgentManager 核心方法
配置与生命周期
configure(max_subagent_count, auto_cleanup_per_turn, shared_context_enabled, shared_context_maxlen, max_subagent_history)is_auto_cleanup_per_turn() -> boolis_shared_context_enabled() -> boolregister_blacklisted_tool(tool_name)register_inherent_tool(tool_name)子代理管理
create_subagent(session_id, config) -> tupleremove_subagent(session_id, agent_name) -> strprotect_subagent(session_id, agent_name)is_protected(session_id, agent_name) -> boolcleanup_session_turn_end(session_id) -> dictcleanup_session(session_id) -> dictget_handoff_tools_for_session(session_id) -> listget_subagent_tools(session_id, agent_name) -> list | None历史管理
update_subagent_history(session_id, agent_name, current_messages)get_subagent_history(session_id, agent_name) -> listclear_subagent_history(session_id, agent_name) -> strPrompt构建
build_task_router_prompt(session_id) -> strbuild_static_subagent_prompts(session_id, agent_name) -> strbuild_dynamic_subagent_prompts(session_id, agent_name, runtime) -> str后台任务管理
create_pending_subagent_task(session_id, agent_name) -> strget_pending_subagent_tasks(session_id, agent_name) -> list[str]get_latest_task_id(session_id, agent_name) -> str | Nonestore_subagent_result(session_id, agent_name, success, result, task_id, error, execution_time, metadata)get_subagent_result(session_id, agent_name, task_id) -> SubAgentExecutionResult | Nonehas_subagent_result(session_id, agent_name, task_id) -> boolclear_subagent_result(session_id, agent_name, task_id)get_subagent_status(session_id, agent_name) -> strget_all_subagent_status(session_id) -> dict公共上下文管理
add_shared_context(session_id, sender, context_type, content, target)get_shared_context(session_id, filter_by_agent) -> listset_shared_context_enabled(session_id, enabled)cleanup_shared_context_by_agent(session_id, agent_name)clear_shared_context(session_id)8.2 SubAgentExecutionResult 数据结构
8.3 DynamicSubAgentConfig 数据结构
8.4 DynamicSubAgentSession 数据结构
Screenshots or Test Results / 运行截图或测试结果
测试1:动态Agent创建和保护机制
测试流程:动态建立若干个SubAgent,给其中一些加入保护,另一些不加保护,观察清理情况,以及跨轮对话的记忆
测试1——动态创建子Agent和保护机制.md
测试2:子Agent历史上下文
测试流程:建立一个SubAgent,并使其跨多轮对话,查看其是否有多轮对话的记忆。
测试2——子Agent历史上下文.md
测试3:子Agent共享上下文
测试流程
send_shared_context发送工具向共享上下文中添加内容测试3——子Agent共享上下文.md
测试4:异步与等待
测试流程
设计一个同时包含并行与串行、前台与后台模式的任务,观察LLM的工作流程
测试4——异步与等待.md
测试5:超长上下文工作实例
测试流程
让agent阅读Astrbot的全部代码,并生成细致的开发文档
Checklist / 检查清单
😊 If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
/ 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
👀 My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
/ 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。
🤓 I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in
requirements.txtandpyproject.toml./ 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到
requirements.txt和pyproject.toml文件相应位置。😮 My changes do not introduce malicious code.
/ 我的更改没有引入恶意代码。
Summary by Sourcery
Add a dynamic, enhanced SubAgent system with shared context, history, and management tooling, and wire it into the main agent configuration and execution pipeline.
New Features:
Enhancements:
Documentation:
Tests:
Chores:
Summary by Sourcery
Introduce an enhanced dynamic SubAgent system with runtime-created subagents, shared context, history persistence, and background task management, and wire it into the main agent’s tooling, prompts, and lifecycle.
New Features:
Enhancements:
Tests: