第一阶段、 DeepAgents 框架定位与核心价值 链接到标题

一、 什么是 DeepAgents ? 链接到标题

DeepAgents(代码库名为 deepagents)是一个基于 LangChainLangGraph 构建的企业级高级智能体框架。它建立在 LangGraph(底层运行时)和 LangChain(工具/模型层)之上,是一个高阶的Agent Harness(智能体装备/套件)

  • 定位:它旨在简化长运行自主智能体 (Long-running Autonomous Agents) 的开发过程,通过内置的最佳实践和中间件,解决复杂任务中的规划、记忆、工具使用和环境交互问题。

  • 核心理念:如果说 LangChain 提供了积木,LangGraph 提供了地基,那么 DeepAgents 就是一套成品级的框架。它预设了最佳实践(规划、文件系统、子智能体),让你能快速构建类似 “OpenAI Deep Research” 或 “Claude Code” 的应用。

  • 论文地址:https://arxiv.org/pdf/2510.21618

  DeepAgents 开源地址:https://github.com/langchain-ai/deepagents

二、 解决的核心问题 链接到标题

  传统的 Agent 开发通常运行一个简单的循环:思考→调用工具→观察→重复。这种模式在处理多小时或多天的任务时,容易遇到以下"浅层陷阱"(Shallow Agent Problem)

  1. 规划能力缺失:原生 Agent 倾向于“走一步看一步”,缺乏全局视角的任务拆解,容易在多步任务中迷失方向。

  2. 遗忘与混乱:在执行超过 10-20 步的长任务时,由于 Context Window(上下文窗口)限制,传统 Agent 容易忘记初始目标或陷入死循环。

  3. 环境交互困难:文件系统操作、代码执行环境(沙箱)的配置和安全管理复杂。

  4. 上下文污染:所有工具返回结果都堆积在一个对话历史中,导致噪声过大。

  5. 协作编排复杂:多智能体(Multi-Agent)之间的任务分发和上下文隔离难以实现。

  DeepAgents 通过引入"类人"的工作流解决了这些问题:先做计划(Plan),再执行,利用文件系统管理记忆,遇到复杂子任务时"外包"给子 Agent。将规划工具、文件系统访问、子代理和详细提示词等关键机制整合在一起,以支持复杂的深度任务 。

import deepagents

print(dir(deepagents))

三、 应用场景 链接到标题

DeepAgents 不适合用来做简单的聊天机器人(Chatbot),它是为重任务设计的,它适用于任务需规划、上下文海量、需多专家协作、要求持久记忆的场景,将LangChain生态从单步响应提升至自主完成复杂项目的高度:

  • 深度调研 (Deep Research):自动进行多轮网页搜索、阅读文档、整理笔记并生成长篇报告(如分析某个行业的市场格局)。

  • 全栈代码生成 (Coding Assistant):类似 Claude Code,在沙箱环境中编写、运行、测试和修复代码,甚至重构整个代码库。

  • 复杂数据分析:自动连接数据库,生成 SQL,执行查询,将中间数据存为 CSV 文件(在虚拟文件系统中),最后生成图表。

  • 自动化运维 (DevOps Automation):操作文件系统、执行 Shell 命令、管理服务器状态。

  • 复杂工作流编排:需要多角色协作(如产品经理-程序员-测试员)的复杂业务流程。

  当然这个描述相对来说比较抽象,因此我们这里对适用于DeepAgents的场景进行一个总结:

DeepAgents 适用场景

场景类型能力说明工作逻辑 / 技术特点代表性案例
深度调研与报告生成支持长周期、多步骤、多来源信息整合的研究任务• 自动生成研究计划(Todo)• 调用搜索工具获取资料• 将关键信息写入文件系统(长期记忆)• 使用子代理(Sub-Agents)深入研究子课题• 主代理统一规划、整合结果• LangChain Deep Research 示例(Tavily 搜索 + 多子代理拆分研究)• OpenAI Deep Research(官方生产级深度调研助理)
自动编程与代码助理理解代码、修改代码、生成新文件、执行工具链• 代理可读写虚拟文件系统• 自动分析源码并输出 diff• 人工审批(Human-in-the-Loop)保证安全写入• 调用 Shell / 测试工具执行流程• 可将项目规范写入 /memories 用作长期记忆• LangChain DeepAgents CLI(终端自动编码)• Anthropic Claude Code(深度自动重构与编程)• Manus(多步骤代码智能体)
复杂流程自动化(业务流程 Orchestration)将多个步骤串联为可控流程,适合企业级自动化任务• 任务分解 → 多步骤规划 → 调用不同工具• 搜索、筛选、处理、生成等多环节协作• 使用文件系统存储中间数据(如列表、分析结果等)• 支持多工具、多子任务并行处理• DeepAgents 求职助手(职位搜索 → 筛选排序 → 求职信生成 → 打包结果)• 企业场景如:自动生成分析报告 / 客服知识库构建 / 数据采集+处理流

第二阶段、 与 LangChain 及 LangGraph 的区别 链接到标题

  从技术定位看,LangChain 适用于需要自定义提示与工具的基础智能体搭建;LangGraph 更适合构建复杂的多智能体系统与工作流;而 DeepAgents 面向希望省去底层开发、直接采用深度自主机制的用户,可快速实现 AutoGPT 类应用。因此,DeepAgents 本质上是基于 LangChain 的深度模式封装——它并非替代 LangChain 或 LangGraph,而是将其常用抽象与运行时封装为开箱即用的组件,可视为一层“开发加速器”。

特性LangChainLangGraphDeepAgents
层级基础组件库 (Foundation)编排引擎 (Orchestration)应用框架 (Application Framework)
核心抽象Chain, Runnable, ToolStateGraph, Node, EdgeDeepAgent, Middleware, Backend
灵活性极高 (积木块)高 (自定义图结构)中 (Opinionated / 约定优于配置)
开箱即用低 (需自行组装)中 (需定义图逻辑)高 (内置规划、文件系统、子代理)
适用对象库开发者/底层构建复杂流程序列化开发者应用开发者/企业级解决方案
  • LangChain:提供 Prompt, Models, Tools 等积木。

  • LangGraph:提供 State, Nodes, Edges 等地基和连接逻辑。

  • DeepAgentsLangGraph 的一种“最佳实践实现”。它底层使用 LangGraph 来管理状态和循环,但向上提供了更高级的 API (create_deep_agent),隐藏了底层的图构建细节。

第三阶段、 DeepAgents 核心功能介绍 链接到标题

  • 注意: 没有学习LangChain的用户,建议先学习LangChain1.0的基础内容,再学习DeepAgents。

一、安装环境与依赖 链接到标题

!python --version
# 安装deepagents依赖
!pip install deepagents
# 查看版本
!pip list | grep -E 'langchain|deepagents'
# from langchain_openai import ChatOpenAI
from langchain_deepseek import ChatDeepSeek
from dotenv import load_dotenv

# 1. 加载.env环境变量
load_dotenv(override=True)

# 2. 初始化模型
model = ChatDeepSeek(model="deepseek-chat", temperature=0)
model.invoke("你好")

二、 核心入口:create_deep_agent() 链接到标题

这是整个框架的核心函数,它创建了一个功能完整的深度智能体。

默认配置

  • 使用 Claude Sonnet 4 或 GPT-4o 作为默认模型(推荐)。

  • 集成 7 个核心文件操作工具。

  • 提供待办事项管理功能。

  • 支持子代理调用。

关键参数

  • model: 支持自定义语言模型。

  • tools: 自定义工具集。

  • system_prompt : 系统提示词

  • subagents: 子代理配置。

  • backend: 文件存储后端。

  • interrupt_on: 人机交互配置 (Human-in-the-Loop)。允许在特定节点暂停 Agent 执行,等待人工干预。这对于安全审核(删除文件)、成本控制(调用昂贵 API)和质量保证至关重要。

from deepagents import create_deep_agent

create_deep_agent?
# 安装网络搜索工具
!pip install langchain-tavily
from deepagents import create_deep_agent
from langchain_tavily import TavilySearch
from langgraph.checkpoint.memory import InMemorySaver

# 1. 初始化 Tavily 搜索工具
tavily = TavilySearch(max_results=3)

# 2. 编写系统提示词
research_instructions = """
您是一位资深的研究人员。您的工作是进行深入的研究,然后撰写一份精美的报告。
您可以通过互联网搜索引擎作为主要的信息收集工具。
## 可用工具
### `互联网搜索`
使用此功能针对给定的查询进行互联网搜索。您可以指定要返回的最大结果数量、主题以及是否包含原始内容。
### `写入本地文件`
使用此功能将研究报告保存到本地文件。当您完成研究并生成报告后,请使用此工具将完整的报告内容保存到文
件中。
- 文件路径建议使用 .md 格式(Markdown),例如 "research_report.md" 或 "./reports/报告名
称.md"
- 请确保报告内容完整、结构清晰,包含所有章节和引用来源
## 工作流程
在进行研究时:
1. 首先将研究任务分解为清晰的步骤
2. 使用互联网搜索来收集全面的信息
3. 将信息整合成一份结构清晰的报告
4. **重要**:完成报告后,务必使用 `写入本地文件` 工具将完整报告保存到本地文件
5. 务必引用你的资料来源
**注意**:请确保在完成研究后,将完整的报告内容保存到文件中,这样用户可以方便地查看和保存报告。
"""

# 2. 创建 DeepAgents 智能体
agent = create_deep_agent(
    name="DeepAgents_Agent",       # 智能体名称
    tools=[tavily],                # 可调用工具Tool
    model=model,                   # 模型Model
    system_prompt=research_instructions,  # 系统提示词
    checkpointer=InMemorySaver(),  # 检查点Checkpointer,内存检查点
)

# 3. 配置线程 ID
config = {"configurable": {"thread_id": "1"}}

result = agent.invoke({"messages": [{"role": "user", "content": "帮我查询一下有关deepagents框架的最新动态"}]}, config=config)
print(result["messages"][-1].content)

三、 create_deep_agent 内部结构 链接到标题

  • 源码参数截图

# 安装美化代码库Rich
!pip install rich
# 导入Rich库,用于美化代码
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
RICH_AVAILABLE = True
console = Console()

def print_agent_tools(agent):
    """
    打印 Agent 中加载的所有工具
    包括用户自定义工具、文件系统工具、系统工具等
    """
    # 获取 agent 的 nodes (LangGraph 的节点)
    if hasattr(agent, 'nodes') and 'tools' in agent.nodes:
        tools_node = agent.nodes['tools']

        # tools_node 是 PregelNode,真正的 ToolNode 在 bound 属性中
        if hasattr(tools_node, 'bound'):
            tool_node = tools_node.bound

            # 从 ToolNode 获取工具
            if hasattr(tool_node, 'tools_by_name'):
                tools = tool_node.tools_by_name

                # 分类工具
                user_tools = []
                filesystem_tools = []
                system_tools = []

                for tool_name, tool in tools.items():
                    tool_info = {
                        'name': tool_name,
                        'description': getattr(tool, 'description', '无描述')
                    }

                    # 分类
                    if tool_name in ['ls', 'read_file', 'write_file', 'edit_file', 'glob', 'grep', 'execute']:
                        filesystem_tools.append(tool_info)
                    elif tool_name in ['write_todos', 'task']:
                        system_tools.append(tool_info)
                    else:
                        user_tools.append(tool_info)

                # 打印加载工具的输出
                _print_tools_rich(user_tools, filesystem_tools, system_tools)

            else:
                print("无法获取工具列表 (tools_by_name 不存在)")
        else:
            print("无法获取工具列表 (bound 属性不存在)")
    else:
        print("无法获取工具列表 (nodes 结构不符合预期)")

def _print_tools_rich(user_tools, filesystem_tools, system_tools):
    """使用 Rich 库美化打印工具列表"""
    console.print()

    # 创建表格
    table = Table(title="Agent 加载的工具列表", show_header=True, header_style="bold magenta")
    table.add_column("类别", style="cyan", width=20)
    table.add_column("工具名称", style="green", width=20)
    table.add_column("描述", style="white", width=60)

    # 添加用户工具
    for i, tool in enumerate(user_tools):
        category = "用户工具" if i == 0 else ""
        desc = tool['description'][:80] + "..." if len(tool['description']) > 80 else tool['description']
        table.add_row(category, tool['name'], desc)

    # 添加文件系统工具
    for i, tool in enumerate(filesystem_tools):
        category = "文件系统工具" if i == 0 else ""
        desc = tool['description'][:80] + "..." if len(tool['description']) > 80 else tool['description']
        table.add_row(category, tool['name'], desc)

    # 添加系统工具
    for i, tool in enumerate(system_tools):
        category = "系统工具" if i == 0 else ""
        desc = tool['description'][:80] + "..." if len(tool['description']) > 80 else tool['description']
        table.add_row(category, tool['name'], desc)

    console.print(table)

    # 打印统计
    total = len(user_tools) + len(filesystem_tools) + len(system_tools)
    console.print(Panel(
        f"[bold green]共计 {total} 个工具[/bold green]\n\n"
        f"• 用户工具: {len(user_tools)}\n"
        f"• 文件系统工具: {len(filesystem_tools)}\n"
        f"• 系统工具: {len(system_tools)} 个",
        title="统计信息",
        border_style="green"
    ))
    console.print()
print_agent_tools(agent)

这里可以看到,除了自己定义的工具(如 Tavily 搜索),DeepAgents 还默认添加了一些其他工具:

  • 文件系统中间件(FileSystemMiddleware): 用于读写、查询、执行文件系统中的文件。

  • 待办事项中间件(TodoListMiddleware): write_todos 用于写入待办事项,task 用于创建子agent来执行待办事项。

这些都是 DeepAgents 特有的功能,用于支持智能体在实际应用中的各种场景。那么,接下来我们来看看这些功能的具体应用。

  • Langgraph Studio 中可视化结构图:能看到

    • PatchToolCallsMiddleware用于 自动检测并修复“悬空”的工具调用 的关键中间件,确保工具调用的完整性和正确性。

    • SummarizationMiddleware上下文压缩中间件,防止上下文过长

第四阶段、 四大核心内置工具与组件详解 链接到标题

  DeepAgents 通过中间件 (Middleware) 的形式,为智能体注入了四项核心能力,构成了框架的四大支柱(Four Pillars)

  DeepAgents四大内置工具通过角色分离、状态贯通、成本优化的设计哲学,将长周期Agent的开发复杂度降低70%以上,同时通过LangGraph运行时保障生产级可靠性。其核心价值在于将原本需要手动编排的规划-存储-委托-执行流程,固化为中心化、可复用、可观测的中间件体系,标志着AI Agent从"脚本化"向"产品化"的关键演进。

  • 协同价值:系统提示词确保质量,规划工具保证进度,文件系统实现数据共享,子代理隔离分析风险,四者形成高可靠、可追溯、可恢复的完整闭环。
维度系统提示词 (System Prompt)规划工具 (Planning Tool)文件系统 (File System)子代理 (Sub Agents)
角色定位行为总导演:定义Agent的"世界观"与工具使用范式,确保三大中间件协同不偏离目标任务架构师:将模糊需求转化为可执行、可追踪、可动态调整的结构化任务蓝图上下文仓库:虚拟化存储引擎,解决长任务中的信息溢出与状态持久化难题执行特派员:实现上下文隔离与专业分工,防止主Agent因深层递归导致状态混乱
核心功能内置Claude Code风格指令,涵盖规划逻辑、文件操作规范、子代理调用协议;支持场景化自定义覆盖write_todos: 生成带优先级/依赖关系的JSON任务列表read_todos: 实时查询任务执行进度与状态ls/glob: 文件浏览与模式匹配read/write/edit: CRUD操作grep: 内容检索execute: 沙箱命令执行task: 动态生成同构或异构子Agent支持独立上下文窗口与工具集配置结果通过文件系统回传
技术实现字符串模板,在create_deep_agent时注入;默认提示词约2000 tokens,包含ReAct循环与三大中间件调用示例TodoListMiddleware:拦截LLM输出中的todo_list字段,解析为agent_state.todos字典,状态变更触发图节点重计算FilesystemMiddleware:基于LangGraph State的files字段实现内存级虚拟文件系统,大工具结果(>2KB)自动触发write_file落盘SubAgentMiddleware:将task调用编译为独立的StateGraph子图,通过命名空间隔离状态,父图通过files读取子图输出
状态管理静态配置,单次会话内不可变;可通过configurable_agent实现热更新动态状态机:每个todo含id/description/status/priority/dependencies字段,执行后状态从pendingcompleted,支持update_todos动态调整持久化存储:默认存储在LangGraph State,支持切换StateBackend(内存/Redis/Postgres)实现跨会话文件共享完全隔离:子Agent拥有独立的messagesfiles命名空间,异常不会污染父Agent状态;支持max_iterations限制防止无限递归
使用场景垂直领域定制:金融研究/医疗诊断等需强化专业约束的场景② 多Agent协作:统一多个子Agent的行为规范③ 安全合规:注入数据脱敏、权限检查等硬性规则长周期研究:自动拆解为文献检索→数据收集→分析→撰写的阶段性任务② 故障恢复:崩溃后通过read_todos快速定位断点续跑③ 动态重规划:执行中发现信息不足时新增补充任务大结果处理:搜索返回100KB内容自动落盘,避免上下文溢出② 知识沉淀:中间分析结果写入文件供后续步骤复用③ 多Agent数据共享:父Agent与子Agent通过文件交换数据,无需序列化传递高风险操作隔离:网页抓取/代码执行等易失败任务委托给子Agent② 专业化分工:主Agent负责任务编排,子Agent专注领域执行(如专门的数据分析Agent)③ 资源优化:子Agent可使用轻量化模型,降低整体token成本

一、 系统提示词 (System Prompts) 链接到标题

  • 功能:定义 Agent 的“人设”、行为准则和核心目标。

  • 机制:框架会自动将用户定义的 system_prompt 与内置的 BASE_AGENT_PROMPT 结合。

  • 作用:确保 Agent 始终遵循指令,理解其可用的工具集,并保持一致的输出风格。

  • 角色本质:系统提示词是DeepAgents三大中间件协同的"契约",其默认版本包含:

    • 规划指令:要求LLM在任务开始前必须调用write_todos,输出格式为JSON Schema

    • 文件操作规范:明确write_file用于新内容,edit_file用于局部修改,避免覆盖冲突

    • 子代理调用协议:规定task工具的参数结构及结果通过/subagent_results/.md回传

    • 安全底线:禁止直接执行删除、格式化等危险命令,必须通过execute沙箱

FileSystem_System_prompt 文件系统提示词 链接到标题

Execute_Tool_Descriptition 执行工具提示词 链接到标题

  • 文件系统中间件提示词(FileSystemMiddleWare)

  • 文件路径:deepagents/middleware/filesystem.py

Write_Todos_System_prompt 写执行任务提示词 链接到标题

  • TodoList中间件提示词(TodoListMiddleware)

  • 文件路径:deepagents/middleware/todo.py

Task_System_prompt Task执行器提示词 链接到标题

  • 子智能体中间件提示词(SubAgentMiddleware)

  • 文件路径:deepagents/middleware/subagent.py

二、 规划工具 (Planning System / Todo List) 链接到标题

  • 组件TodoListMiddleware

  • 工具名write_todos

  • 功能:Agent 在行动前先生成 Markdown 格式的待办事项列表 (Todo List),并在执行过程中更新状态(完成/进行中)。

  • 工作流

    1. Agent 接收复杂任务(简单短期的任务不会触发todolist)。

    2. 调用 write_todos 将任务拆解为子步骤 (Pending)。

    3. 每完成一步,更新状态为 (Completed)。

    4. 自我反思:在每一步行动前,Agent 都会看到当前的 Todo List,从而避免迷失方向。这强制模型进行"思维链"的显性化管理。

  • 类似编程工具中的待执行项
import json
from rich.json import JSON

def debug_agent(query: str, save_to_file: str = None):
    """
    运行智能体并打印中间过程(使用 Rich 美化输出)
    参数:
    query: 用户查询
    save_to_file: 保存最终输出到文件(可选)
    返回:
    str: 最终的研究报告
    """
    console.print(Panel.fit(
        f"[bold cyan]查询:[/bold cyan] {query}",
        border_style="cyan"
    ))
    step_num = 0
    final_response = None

    config = {"configurable": {"thread_id": "2"}}

    # 实时流式输出
    for event in agent.stream(
        {"messages": [{"role": "user", "content": query}]},
        stream_mode="values",
        config=config
    ):
        step_num += 1

        console.print(f"\n[bold yellow]{'─' * 80}[/bold yellow]")
        console.print(f"[bold yellow]步骤 {step_num}[/bold yellow]")
        console.print(f"[bold yellow]{'─' * 80}[/bold yellow]")

        if "messages" in event:
            messages = event["messages"]
            if messages:
                msg = messages[-1]
                # 保存最终响应
                if hasattr(msg, 'content') and msg.content and not hasattr(msg,'tool_calls'):
                    final_response = msg.content
                # AI 思考
                if hasattr(msg, 'content') and msg.content:
                    # 如果内容太长,只显示前300字符作为预览
                    content = msg.content
                    if len(content) > 300 and not (hasattr(msg, 'tool_calls') and msg.tool_calls):
                        preview = content[:300] + "..."
                        console.print(Panel(
                            f"{preview}\n\n[dim](内容较长,完整内容将在最后显示)[/dim]",
                        title="[bold green]AI 思考[/bold green]",
                        border_style="green"
                        ))
                    else:
                        console.print(Panel(
                            content,
                            title="[bold green]AI 思考[/bold green]",
                            border_style="green"
                        ))
                # 工具调用
                if hasattr(msg, 'tool_calls') and msg.tool_calls:
                    for tool_call in msg.tool_calls:
                        tool_info = {
                            "工具名称": tool_call.get('name', 'unknown'),
                            "参数": tool_call.get('args', {})
                        }
                        console.print(Panel(
                            JSON(json.dumps(tool_info, ensure_ascii=False)),
                            title="[bold blue]工具调用[/bold blue]",
                            border_style="blue"
                        ))
                # 工具响应
                if hasattr(msg, 'name') and msg.name:
                    response = str(msg.content)[:500]
                    if len(str(msg.content)) > 500:
                        response += f"\n... (共 {len(str(msg.content))} 字符)"

                    console.print(Panel(
                        response,
                        title=f"[bold magenta]工具响应: {msg.name}[/bold magenta]",
                        border_style="magenta"
                    ))
    console.print("\n[bold green]任务完成![/bold green]\n")
    return final_response

print("调试函数已创建")
# 示例:使用调试函数运行研究任务
query = "详细调研 LangChain DeepAgents 框架的核心特性,并写一份结构化的总结报告。"

# 使用调试函数)
result = debug_agent(query)

三、 子代理 (Sub-Agent Delegation) 链接到标题

  • 组件SubAgentMiddleware

  • 工具名task (delegate_task)

  • 核心概念

    • 任务隔离:每个子代理有独立的上下文窗口。

    • 并行执行:支持同时启动多个子代理。

    • 结果聚合:智能整合多个子代理的输出。

  • 机制

    • 当任务过于具体(如"爬取并分析这篇长论文")时,主 Agent 会生成一个隔离环境的子 Agent 去执行。

    • 子 Agent 启动时,只继承必要的环境配置,但拥有全新的、空白的消息历史

    • 子 Agent 执行完毕后只返回一个总结性的结果。这保证了主 Agent 的时间线(Context)保持干净,极大地节省了 Token。

自动触发默认的 SubAgentMiddleware 链接到标题

import asyncio
import os
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree
from deepagents import create_deep_agent
from langchain_openai import ChatOpenAI
from langchain_community.tools import TavilySearchResults

from langchain_core.messages import ToolMessage, BaseMessage

# 加载环境变量
load_dotenv(override=True)

# 配置 Rich Console
console = Console()

async def run_auto_subagent_demo():
    """
    演示:不显式传入 subagents 参数,自动触发默认的 SubAgentMiddleware
    """
    console.print(Panel.fit("[bold magenta]DeepAgents 自动 SubAgent 中间件演示[/bold magenta]", border_style="magenta"))
    console.print("[dim]本演示验证:即使不传入 subagents 参数,Agent 默认也会启用 'general-purpose' 子 Agent。[/dim]")

    # 1. 初始化模型
    # 使用 GPT-4o 以确保对复杂指令的理解和工具调用的准确性
    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    # 2. 定义工具
    # 给 Agent 一些工具,以便子 Agent 也有工具可用
    tools = [TavilySearchResults(max_results=2)]

    # 3. 创建 Agent (不传入 subagents)
    # create_deep_agent 默认会加载 SubAgentMiddleware(general_purpose_agent=True)
    # 这意味着 Agent 会自动获得一个名为 'task' 的工具,可以调用 'general-purpose' 子 Agent
    console.print("[bold cyan]正在创建 Agent (subagents=None)...[/bold cyan]")

    agent = create_deep_agent(
        model=llm,
        tools=tools,
        # subagents=[], # 故意不传或传空
        system_prompt="""你是一个能够高效处理并发任务的智能助手。
                        对于包含多个独立部分的复杂任务,你必须使用 'task' 工具来创建 'general-purpose' 子 Agent 进行处理。
                        不要自己在主线程中串行执行所有操作。利用子 Agent 来隔离上下文并提高效率。"""
    )

    # 4. 定义一个适合并行/隔离的任务
    task = """请同时调研以下两个完全不同的主题,并分别给出简短总结:
            1. Python 语言的历史起源。
            2. Rust 语言的内存安全机制。
            请务必使用子 Agent 分别处理这两个任务。"""

    console.print(f"\n[bold green]用户任务:[/bold green] {task}\n")

    # 5. 运行并可视化
    step = 0

    # 检查 API Key
    if not os.getenv("OPENAI_API_KEY"):
        console.print("[bold red]❌ 错误: 未找到 OPENAI_API_KEY,请检查 .env 文件[/bold red]")
        return

    console.print("[dim]开始流式输出...[/dim]")
    try:
        async for event in agent.astream({"messages": [("user", task)]}):
            step += 1
            # 遍历所有节点输出 (e.g., 'agent', 'tools')
            for node_name, node_data in event.items():
                if node_data is None:
                    continue

                if "messages" in node_data:
                    msgs = node_data["messages"]
                    # 确保是列表
                    if not isinstance(msgs, list):
                        msgs = [msgs]

                    for msg in msgs:
                        # 0. 过滤非消息对象
                        if not isinstance(msg, BaseMessage):
                            continue

                        # 1. 检测工具调用 (期望看到 'task' 工具)
                        if hasattr(msg, "tool_calls") and msg.tool_calls:
                            tree = Tree(f"[bold yellow]Step {step}: 决策与调用 (Node: {node_name})[/bold yellow]")
                            for tc in msg.tool_calls:
                                tool_name = tc['name']
                                tool_args = tc['args']

                                if tool_name == "task":
                                    # 验证成功!
                                    branch = tree.add(f"[bold red]🚀 触发 'task' 工具 (Sub-Agent)[/bold red]")
                                    branch.add(f"[cyan]子 Agent 类型:[/cyan] {tool_args.get('subagent_type')}")
                                    branch.add(f"[cyan]任务指令:[/cyan] {tool_args.get('description')}")
                                else:
                                    tree.add(f"[blue]普通工具调用:[/blue] {tool_name}")

                            console.print(tree)

                        # 2. 检测工具输出 (Sub-Agent 的返回结果)
                        elif isinstance(msg, ToolMessage):
                            if msg.name == "task":
                                # Sub-Agent 完成任务返回
                                panel = Panel(
                                    msg.content,
                                    title=f"[bold magenta]Sub-Agent 完成任务 (Node: {node_name})[/bold magenta]",
                                    border_style="magenta"
                                )
                                console.print(panel)
                            else:
                                console.print(f"[dim]Tool Output ({msg.name}): {msg.content[:100]}...[/dim]")

                        # 3. 检测 AI 最终回复
                        elif msg.content and not msg.tool_calls:
                            title = f"[bold green]Agent 回复 (Node: {node_name})[/bold green]"
                            console.print(Panel(msg.content, title=title, border_style="green"))

    except Exception as e:
        console.print(f"[bold red]❌ 运行时错误: {e}[/bold red]")

    console.print("\n[bold magenta]演示结束[/bold magenta]")

if __name__ == "__main__":
    await run_auto_subagent_demo()

显示传入subAgent参数 链接到标题

# 安装 MCP 适配器(关键依赖)\MCP 服务器开发库(如需自定义工具)
#!pip install langchain-mcp-adapters mcp

检查 Node.js

  • node –version

检查 npm/npx

  • npx –version

手动安装 MCP 服务器包

  • npm install -g @amap/amap-maps-mcp-server
import asyncio
import os
from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree
from deepagents import create_deep_agent
from langchain_deepseek import ChatDeepSeek
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.tools import TavilySearchResults

from langchain_core.messages import ToolMessage, BaseMessage

# 加载环境变量
load_dotenv(override=True)

# 1. 配置 Rich Console
console = Console()

# 2. 配置 Context7 MCP (连接官方文档)
async def setup_mcp_tools():
    console.print("[dim]正在连接 Context7 MCP 服务器...[/dim]")
    # 检查 node 环境
    try:
        client = MultiServerMCPClient({
            "context7": {
                "transport": "stdio",
                "command": "npx",
                "args": ["-y", "@upstash/context7-mcp@latest"],
            }
        })
        # 获取工具
        tools = await client.get_tools()
        console.print(f"[green]成功加载 {len(tools)} 个 MCP 工具[/green]")
        return client, tools
    except Exception as e:
        console.print(f"[red]连接 MCP 失败: {e}[/red]")
        console.print("[yellow]将使用模拟工具继续...[/yellow]")
        return None, []

# 3. 定义子 Agent 配置
def get_subagents_config(mcp_tools):
    # 子 Agent 1: 官方文档专家
    doc_tools = mcp_tools if mcp_tools else [TavilySearchResults(max_results=3)]

    docs_researcher = {
        "name": "DocsResearcher",
        "description": "负责查阅官方文档和技术规范的专家 Agent。",
        "system_prompt": "你是一名专门查阅官方文档的技术专家。请使用工具获取准确的技术细节。不要猜测。",
        "tools": doc_tools,
        "model": "deepseek-chat"
    }

    # 子 Agent 2: 社区生态专家
    community_researcher = {
        "name": "CommunityResearcher",
        "description": "负责搜索社区博客、教程和最佳实践的专家 Agent。",
        "system_prompt": "你是一名关注社区动态的开发者。请搜索博客、论坛和 GitHub 讨论。",
        "tools": [TavilySearchResults(max_results=3)],
        "model": "deepseek-chat"
    }

    return [docs_researcher, community_researcher]

# 4. 主运行逻辑
async def run_parallel_demo():
    console.print(Panel.fit("[bold blue]DeepAgents 并行子 Agent 演示[/bold blue]", border_style="blue"))

    # 初始化 MCP
    mcp_client, mcp_tools = await setup_mcp_tools()

    # 获取子 Agent 配置
    subagents = get_subagents_config(mcp_tools)

    # 创建主 Agent
    llm = ChatDeepSeek(model="deepseek-chat", temperature=0)

    agent = create_deep_agent(
        model=llm,
        tools=[],
        subagents=subagents,        # 传入子 Agent 配置
        system_prompt="""你是一名技术总监。你的任务是协调 DocsResearcher 和 CommunityResearcher 完成调研任务。
                        请根据用户需求,将任务拆解并分发给这两个子 Agent。
                        如果任务允许,请务必并行调用它们以提高效率。
                        最后汇总它们的报告。"""
    )

    task = "请详细调研 'LangChain DeepAgents' 框架。我需要官方的技术架构说明(来自文档)以及社区的最佳实践案例。请对比两者。"

    console.print(f"\n[bold green]任务指令:[/bold green] {task}\n")

    # 运行并可视化
    step = 0
    try:
        async for event in agent.astream({"messages": [("user", task)]}):
            step += 1
            # 遍历所有节点输出 (e.g., 'agent', 'tools')
            for node_name, node_data in event.items():
                if node_data is None:
                    continue

                if "messages" in node_data:
                    msgs = node_data["messages"]
                    # 确保是列表
                    if not isinstance(msgs, list):
                        msgs = [msgs]

                    for msg in msgs:
                        # 0. 过滤非消息对象
                        if not isinstance(msg, BaseMessage):
                            continue

                        # 1. 检测工具调用 (期望看到 'task' 工具)
                        if hasattr(msg, "tool_calls") and msg.tool_calls:
                            tree = Tree(f"[bold yellow]Step {step}: 决策与调用 (Node: {node_name})[/bold yellow]")
                            for tc in msg.tool_calls:
                                tool_name = tc['name']
                                tool_args = tc['args']

                                if tool_name == "task":
                                    # 验证成功!
                                    branch = tree.add(f"[bold red]🚀 触发 'task' 工具 (Sub-Agent)[/bold red]")
                                    branch.add(f"[cyan]子 Agent 类型:[/cyan] {tool_args.get('subagent_type')}")
                                    branch.add(f"[cyan]任务指令:[/cyan] {tool_args.get('description')}")
                                else:
                                    tree.add(f"[blue]普通工具调用:[/blue] {tool_name}")

                            console.print(tree)

                        # 2. 检测工具输出 (Sub-Agent 的返回结果)
                        elif isinstance(msg, ToolMessage):
                            if msg.name == "task":
                                # Sub-Agent 完成任务返回
                                panel = Panel(
                                    msg.content,
                                    title=f"[bold magenta]Sub-Agent 完成任务 (Node: {node_name})[/bold magenta]",
                                    border_style="magenta"
                                )
                                console.print(panel)
                            else:
                                console.print(f"[dim]Tool Output ({msg.name}): {msg.content[:100]}...[/dim]")

                        # 3. 检测 AI 最终回复
                        elif msg.content and not msg.tool_calls:
                            title = f"[bold green]Agent 回复 (Node: {node_name})[/bold green]"
                            console.print(Panel(msg.content, title=title, border_style="green"))

    except Exception as e:
        console.print(f"[bold red]❌ 运行时错误: {e}[/bold red]")


    console.print("\n[bold blue]演示结束[/bold blue]")

# 运行演示
if __name__ == "__main__":
    await run_parallel_demo()

第四阶段、 文件系统集成 (Filesystem & Sandbox) 链接到标题

  • 组件FilesystemMiddleware & Backend

  • 功能:这是 DeepAgent 的核心"外挂大脑"。Agent 不再将所有检索到的长文塞入上下文(Context),而是将其写入 Filesystem(如 /workspace),仅在需要时读取。这极大地扩展了 Agent 的"工作记忆"。

  • 核心工具集

    • ls: 浏览目录结构。

    • read_file: 读取文件内容。关键封装:支持 offset(偏移量)和 limit(行数),强制 Agent 对大文件进行分页读取,避免一次性读爆上下文。

    • write_file: 创建新文件。

    • edit_file: 修改文件。支持精确的字符串替换和 replace_all 模式,并有防呆设计(要求先读后改)。

    • glob: 通配符模糊查找文件。

    • grep: 正则表达式搜索文件内容,像命令行一样在代码库中定位目标。

    • execute: 执行 Shell 命令(需 Sandbox 支持),用于运行代码、安装依赖等。

  • 亮点功能:大结果自动转存

    • 当 Agent 调用工具产生的结果过长时,系统会自动拦截,将完整结果写入文件系统(如 /large_tool_results/{id}),并只给 Agent 返回一个摘要和文件路径。这完美解决了搜索结果或日志文件过长导致 Agent 崩溃的问题。
  • 后端支持 (Backends)

    • FilesystemBackend: 直接操作本地磁盘。

    • DockerBackend: 在 Docker 容器中执行,提供隔离环境。

    • StoreBackend: 存储在数据库中,支持查询和检索。

    • E2BBackend: 使用 E2B 云端沙箱。

    • CompositeBackend: 混合模式(如:本地存文件,Docker 跑代码)。

一、 核心工具集测试 链接到标题

# --- FilesystemMiddleware 工具测试 ---
print("\n" + "="*50)
print("🚀 开始测试 FilesystemMiddleware 工具集")
print("="*50 + "\n")

from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv

# 1. 初始化环境
load_dotenv(override=True)
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 使用本地文件系统后端,根目录设为 ./workspace
# 这样我们可以看到真实文件的创建
# 注意:设置 virtual_mode=True 以支持绝对路径 (如 /hello_world.py) 映射到 ./workspace
backend = FilesystemBackend(root_dir="./workspace", virtual_mode=True)

# 创建 Agent
# 注意:create_deep_agent 默认会自动包含 FilesystemMiddleware
agent = create_deep_agent(
    model=model,
    backend=backend,
    system_prompt="你是一个文件系统操作助手。请根据用户指令使用相应的工具。"
)

# 辅助函数:运行并打印结果
def run_test(task_name, instruction):
    print(f"\n🔹 [测试: {task_name}]")
    print(f"指令: {instruction}")
    try:
        # 使用 invoke 而不是 stream 以简化输出
        result = agent.invoke({"messages": [("user", instruction)]})
        last_msg = result["messages"][-1]
        print(f"🤖 Agent 回复: {last_msg.content}")

        # 打印工具调用详情 (如果有)
        for msg in result["messages"]:
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                for tool in msg.tool_calls:
                    print(f"🛠️  调用工具: {tool['name']} args={tool['args']}")
    except Exception as e:
        print(f"❌ 发生错误: {e}")

write_file 测试 链接到标题

# 2. 依次测试各个工具

# 案例 1: write_file
# 创建一个 Python 脚本文件
run_test("write_file",
         "请在当前目录下创建一个名为 'hello_world.py' 的文件,内容是:\nprint('Hello from DeepAgents!')")

ls 测试 链接到标题

# 案例 2: ls
# 查看目录内容,确认文件创建成功
run_test("ls", "请列出当前目录下的所有文件,确认 hello_world.py 是否存在。")

read_file 测试 链接到标题

# 案例 3: read_file
# 读取刚才创建的文件内容
run_test("read_file", "请读取 'hello_world.py' 的内容并展示给我。")

edit_file 测试 链接到标题

# 案例 4: edit_file
# 修改文件内容
run_test("edit_file",
         "请修改 'hello_world.py' 文件,将 print 内容改为 'Hello from Modified File!'。")

grep 测试 链接到标题

# 案例 5: grep
# 搜索文件中的特定字符串
run_test("grep", "请在当前目录下搜索包含 'Modified' 字符串的文件。")

glob 测试 链接到标题

# 案例 6: glob
# 使用通配符查找文件
run_test("glob", "请找出当前目录下所有的 .py 文件。")

execute 测试 链接到标题

# 案例 7: execute
# 尝试执行脚本
# 注意:execute 工具通常需要 SandboxBackend 支持。
# 如果当前使用的 FilesystemBackend 不支持执行,Agent 会收到错误提示或无法调用。
print("\n🔹 [测试: execute]")
print("指令: 尝试运行 hello_world.py 脚本")
try:
    # 我们尝试强行要求 Agent 运行,看它如何反应
    # 如果没有 execute 工具,Agent 可能会说无法执行
    response = agent.invoke({"messages": [("user", "请使用 execute 工具运行 python hello_world.py")]})
    print(f"🤖 Agent 回复: {response['messages'][-1].content}")
except Exception as e:
    print(f"⚠️ 测试说明: execute 工具可能不可用 (取决于 Backend 支持): {e}")

print("\n" + "="*50)
print("✅ 测试结束")

  能看到在FilesystemBackend下,没有execute方法,因为FilesystemBackend是一个文件系统后端,不支持执行Shell命令。接下来,我们测试一下其他后端。

二、Backend 后端应用 链接到标题

1、默认模式(内存沙箱) 链接到标题

  • 内存沙箱,支持在内存中执行代码,防止代码注入攻击。代码执行结果会被存储在内存中,不会对本地环境造成影响。执行结束后,内存中的数据会被清除。
from deepagents import create_deep_agent
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI

# 加载环境变量
load_dotenv(override=True)

# 定义 LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)

# 默认情况下,DeepAgent 会自动加载 FilesystemMiddleware 并使用内存后端
agent = create_deep_agent(
    model=model,
    system_prompt="你是一个数据处理助手。"
)

# Agent 可以自由创建文件、读取文件,但这些文件只存在于内存中
# 任务结束后,这些文件会自动消失

agent.invoke({"messages": [("user", "创建一个名为 test.txt 的文件并写入 'Hello, World!'")]})
result = agent.invoke({"messages": [("user", "读取 test.txt 文件的内容")]})
print(result["messages"][-1].content)

2、持久化模式(操作真实文件) 链接到标题

  • 持久化沙箱,支持将代码执行结果持久化到本地磁盘,防止代码执行过程中数据丢失。代码执行结束后,内存中的数据会被清除,但是磁盘上的文件会被保留。
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o", temperature=0)

# 将 Agent 的根目录映射到本地的 "./workspace" 文件夹,virtual_mode=True 表示启用虚拟文件系统,作用是将 Agent 执行的所有文件操作都映射到本地的 "./workspace" 文件夹,而不是直接操作本地文件系统。
backend = FilesystemBackend(root_dir="./workspace", virtual_mode=True)

agent = create_deep_agent(
    model=model,
    backend=backend  # 传入后端,中间件会自动使用它
)

# 此时 Agent 执行 write_file("/readme.md", ...) 会在本地 ./workspace/readme.md 创建文件
agent.invoke({"messages": [("user", "创建一个名为 readme.md 的文件并写入 'Hello, World!'")]})
result = agent.invoke({"messages": [("user", "读取 readme.md 文件的内容")]})
print(result["messages"][-1].content)

3、演示大文件读取分页 链接到标题

  • 在FilesystemBackend分页的参数是固定死的,默认每次读取500行,我们可以在系统提示词中指定每次读取的行数。
import asyncio
import os
import shutil
from pathlib import Path
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage, ToolMessage
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend

load_dotenv(override=True)

# 定义工作目录
WORK_DIR = Path("workspace/pagination_demo").resolve()
LARGE_FILE_NAME = "server_logs.txt"
LARGE_FILE_PATH = WORK_DIR / LARGE_FILE_NAME
TARGET_SECRET = "CRITICAL_ERROR_CODE_998877"

async def run_pagination_demo():
    print("\n" + "="*80)
    print("DeepAgents FilesystemMiddleware 分页读取演示")
    print("="*80)

    # 2. 初始化 FilesystemBackend
    # 将 backend 指向我们的测试目录
    # virtual_mode=True 确保 Agent 只能访问该目录下的文件,不能访问宿主机其他目录
    backend = FilesystemBackend(root_dir=WORK_DIR, virtual_mode=True)

    # 3. 创建 Agent
    # 我们明确指示 Agent 使用分页读取,每次读取 300 行,Agent默认每次读取500行(DEFAULT_READ_LIMIT)
    system_prompt = """
    你是一个专业的系统管理员。
    你的任务是从日志文件中查找特定的错误代码。
    注意:日志文件可能非常大,为了避免上下文溢出,你必须使用 `read_file` 工具的分页功能。
    每次读取请限制在 300 行以内 (limit=300),并使用 offset 参数向后滚动。
    直到找到目标信息为止。
    """

    llm = ChatOpenAI(model="gpt-4o", temperature=0)

    agent = create_deep_agent(
        model=llm,
        backend=backend,
        system_prompt=system_prompt
    )

    task = f"请在 '/{LARGE_FILE_NAME}' 中查找包含 '{TARGET_SECRET}' 的行,并告诉我它的具体内容。"

    print(f"\n任务: {task}")
    print("-" * 60)

    # 4. 执行任务并观察分页行为
    step = 0
    print("\n开始流式输出...")
    try:
        async for event in agent.astream({"messages": [("user", task)]}):
            for node_name, node_data in event.items():
                # debug: print(f"DEBUG: Node: {node_name}")
                if not node_data: continue

                # 处理 Overwrite 对象
                if hasattr(node_data, "value"):
                    node_data = node_data.value

                if not isinstance(node_data, dict):
                    continue

                if "messages" in node_data:
                    msgs = node_data["messages"]
                    if hasattr(msgs, "value"):
                        msgs = msgs.value

                    if not isinstance(msgs, list): msgs = [msgs]

                    for msg in msgs:
                        # 1. 打印 Agent 的思考 (AIMessage with tool_calls)
                        if hasattr(msg, "tool_calls") and msg.tool_calls:
                            step += 1
                            print(f"\n[Step {step}] Agent 决定调用工具 (Node: {node_name}):")
                            for tc in msg.tool_calls:
                                name = tc['name']
                                args = tc['args']
                                print(f"  >>> 工具: {name}")

                                if name == "read_file":
                                    offset = args.get('offset', 0)
                                    limit = args.get('limit', 'Default')
                                    path_val = args.get('path') or args.get('file_path')
                                    print(f"  >>> 参数: path='{path_val}', offset={offset}, limit={limit}")
                                    print(f"      (说明: 正在读取从第 {offset} 行开始的 {limit} 行数据)")
                                else:
                                    print(f"  >>> 参数: {args}")

                        # 2. 打印工具的输出 (ToolMessage)
                        elif isinstance(msg, ToolMessage):
                            content = msg.content
                            line_count = len(content.splitlines())
                            # 检查是否包含目标 Secret
                            found_secret = TARGET_SECRET in content

                            preview = content[:100].replace('\n', ' ') + "..."
                            print(f"\n[Tool Output] (Node: {node_name}) 读取了 {line_count} 行数据")
                            print(f"  内容预览: {preview}")
                            if found_secret:
                                print(f"  ✨ 成功: 在此分块中发现了目标 Secret: {TARGET_SECRET}")

                        # 3. 打印 Agent 的最终回复 (AIMessage without tool_calls)
                        elif isinstance(msg, BaseMessage) and msg.type == "ai" and msg.content:
                            print(f"\n[Agent 最终回复] (Node: {node_name}):")
                            print("-" * 40)
                            print(msg.content)
                            print("-" * 40)


    except Exception as e:
        print(f"❌ 运行出错: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    await run_pagination_demo()

4、 E2BBackend 使用E2B云沙箱 链接到标题

E2B (Environment To Be) 是一个 专为 AI 智能体设计的云端安全执行环境 。

你可以把它想象成一台 云端电脑 或 远程服务器 ,DeepAgents 将其作为“外挂大脑”和“执行手脚”。当 AI 需要写代码、运行脚本或操作文件时,它不会在你的本地机器上操作,而是连接到 E2B 的云端环境中进行。

  • 核心特性
  1. 安全性与隔离性 (Security & Isolation) :

    • AI 生成的代码(可能包含错误或恶意逻辑)完全运行在云端沙箱中, 绝不会破坏你本地的电脑环境 。
    • 演示代码中,Agent 即使执行了 rm -rf / ,也只是删除了云端临时的沙箱,对宿主机毫发无损。
  2. 持久化会话 (Long-running Sessions) :

    • 沙箱可以保持运行状态。Agent 可以先创建一个文件(如演示中的 /home/user/hello.py ),然后在后续步骤中运行它。环境状态在会话期间是保持的。
  3. 标准 Linux 环境 :

    • 它提供标准的 Linux Shell。演示中 Agent 执行了 uname -a 和 python –version ,就像在真实的服务器上一样。

step1: 访问 https://e2b.dev 注册登陆,然后并获取 API_Key

  • 登陆后进入首页

  • 在左侧选择API Keys选项,创建API

  • 创建API Key后记得保存好,保存到.env文件中,作为:E2B_API_KEY

step2: 安装依赖

!pip install e2b

# 定义 E2B 沙箱后端类
# 该类继承自 BaseSandbox,实现了在 E2B 沙箱中执行命令的功能

import base64
from typing import Any, Optional

# 导入 DeepAgents 后端协议,定义了沙箱后端的接口
from deepagents.backends.protocol import (
    ExecuteResponse,
    FileDownloadResponse,
    FileUploadResponse,
    SandboxBackendProtocol,
)
# 导入基础沙箱类,用于实现沙箱后端的基本功能
from deepagents.backends.sandbox import BaseSandbox

try:
    from e2b import Sandbox
except ImportError:
    Sandbox = None

class E2BBackend(BaseSandbox):
    """E2B 沙箱后端实现,用于 DeepAgents。

    该后端使用 E2B(https://e2b.dev)提供安全、隔离的执行环境。
    """

    def __init__(
        self,
        template: str = "base",
        api_key: Optional[str] = None,
        timeout: Optional[int] = None,
        metadata: Optional[dict[str, str]] = None,
    ) -> None:
        """初始化 E2B 沙箱。

        参数:
            template: E2B 沙箱模板 ID(默认:"base")
            api_key: E2B API 密钥(可选,默认使用 E2B_API_KEY 环境变量)
            timeout: 沙箱超时时间(秒)
            metadata: 自定义沙箱元数据
        """
        if Sandbox is None:
            raise ImportError(
                "e2b package is not installed. "
                "Please install it with `pip install e2b`."
            )

        self.sandbox = Sandbox.create(
            template=template,
            api_key=api_key,
            timeout=timeout,
            metadata=metadata,
        )

    @property
    def id(self) -> str:
        """Unique identifier for the sandbox backend."""
        return self.sandbox.sandbox_id

    def execute(self, command: str) -> ExecuteResponse:
        """Execute a command in the sandbox."""
        try:
            # E2B commands.run returns CommandResult with stdout, stderr, exit_code
            result = self.sandbox.commands.run(command)

            # 返回执行结果,包含 stdout 是标准输出,stderr 是标准错误输出,exit_code 是退出码
            return ExecuteResponse(
                output=result.stdout + result.stderr,
                exit_code=result.exit_code,
                truncated=False,
            )
        except Exception as e:
            return ExecuteResponse(
                output=f"Error executing command: {str(e)}",
                exit_code=1,
                truncated=False,
            )

    def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
        """Upload multiple files to the sandbox."""
        responses = []
        for path, content in files:
            try:
                # Ensure directory exists before writing
                # We can use execute to mkdir -p
                parent_dir = path.rsplit("/", 1)[0]
                if parent_dir:
                    self.sandbox.commands.run(f"mkdir -p {parent_dir}")

                # Write file
                self.sandbox.files.write(path, content)
                responses.append(FileUploadResponse(path=path, error=None))
            except Exception as e:
                error_msg = str(e).lower()
                error = "invalid_path"
                if "permission" in error_msg:
                    error = "permission_denied"

                responses.append(FileUploadResponse(path=path, error=error))
        return responses

    def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
        """Download multiple files from the sandbox."""
        responses = []
        for path in paths:
            try:
                content = self.sandbox.files.read(path)
                # Ensure content is bytes
                if isinstance(content, str):
                    content = content.encode("utf-8")

                responses.append(FileDownloadResponse(path=path, content=content, error=None))
            except Exception as e:
                error_msg = str(e).lower()
                error = "invalid_path"
                if "not found" in error_msg:
                    error = "file_not_found"
                elif "directory" in error_msg:
                    error = "is_directory"
                elif "permission" in error_msg:
                    error = "permission_denied"

                responses.append(FileDownloadResponse(path=path, content=None, error=error))
        return responses

    def close(self):
        """Close the sandbox session."""
        self.sandbox.kill()
  • 将自定义好的沙箱后端类注册到 DeepAgents 中
import asyncio
import os
import sys
from dotenv import load_dotenv
from deepagents import create_deep_agent
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_core.messages import BaseMessage, ToolMessage

# 假设 e2b_backend.py 在同一目录下
try:
    from e2b_backend import E2BBackend
except ImportError:
    # 尝试从当前路径导入
    sys.path.append(os.path.dirname(os.path.abspath(__file__)))
    from e2b_backend import E2BBackend

# 加载环境变量
load_dotenv(override=True)

async def setup_mcp_tools():
    """连接 Context7 MCP 服务器并获取工具"""
    print("正在连接 Context7 MCP 服务器...")
    try:
        # 使用官方 Context7 MCP 配置
        client = MultiServerMCPClient({
            "context7": {
                "transport": "stdio",
                "command": "npx",
                "args": ["-y", "@upstash/context7-mcp@latest"],
            }
        })
        # 获取工具
        tools = await client.get_tools()
        print(f"成功加载 {len(tools)} 个 MCP 工具")
        return client, tools
    except Exception as e:
        print(f"❌ 连接 MCP 失败: {e}")
        return None, []

async def run_e2b_demo():
    print("\n" + "="*80)
    print("DeepAgents E2BBackend (Sandbox) 演示 ")
    print("="*80)

    # 1. 检查 API Key
    if not os.getenv("E2B_API_KEY"):
        print("❌ 错误: 未找到 E2B_API_KEY 环境变量。")
        print("请在 .env 文件中设置 E2B_API_KEY,或直接导出该变量。")
        return

    # 2. 初始化 MCP (用于查询文档等辅助任务)
    mcp_client, mcp_tools = await setup_mcp_tools()

    # 3. 初始化 E2B Backend
    print("正在初始化 E2B 沙箱 (Template: base)...")
    try:
        backend = E2BBackend(template="base")
        print(f"沙箱已启动 (ID: {backend.id})")
    except Exception as e:
        print(f"❌ 沙箱启动失败: {e}")
        return

    try:
        # 4. 创建 DeepAgent
        llm = ChatOpenAI(model="gpt-4o", temperature=0)

        agent = create_deep_agent(
            model=llm,
            tools=mcp_tools,     # 赋予 MCP 工具能力
            backend=backend,     # 赋予 E2B 沙箱能力
            system_prompt="""你是一个拥有云端沙箱环境的高级技术助手。
            你的任务是演示如何在沙箱中进行操作。
            请执行以下步骤:
            1. 使用 'execute_command' 运行 'uname -a' 和 'python --version' 来展示环境信息。
            2. 创建一个 Python 脚本 '/home/user/hello.py',内容是打印 'Hello from E2B Sandbox!'。
            3. 运行这个 Python 脚本并显示输出。
            """
        )

        print(f"\n任务开始")

        # 5. 执行任务
        task = "请开始演示沙箱操作流程。"

        step = 0
        async for event in agent.astream({"messages": [("user", task)]}):
            step += 1
            for node_name, node_data in event.items():
                if node_data is None: continue

                if "messages" in node_data:
                    msgs = node_data["messages"]
                    if not isinstance(msgs, list): msgs = [msgs]

                    for msg in msgs:
                        if isinstance(msg, BaseMessage) and msg.content:
                            # 过滤掉空的工具调用消息
                            is_tool_call_msg = getattr(msg, "tool_calls", None)
                            if not is_tool_call_msg:
                                print(f"\n[Agent ({node_name})]")
                                print("-" * 40)
                                print(msg.content)
                                print("-" * 40)

                        if hasattr(msg, "tool_calls") and msg.tool_calls:
                            print(f"\n[Step {step}: 工具调用]")
                            for tc in msg.tool_calls:
                                args_str = str(tc['args'])
                                if len(args_str) > 500:
                                    args_str = args_str[:500] + "..."
                                print(f"  • {tc['name']}: {args_str}")

                        if isinstance(msg, ToolMessage):
                            content_preview = str(msg.content)
                            if len(content_preview) > 200:
                                content_preview = content_preview[:200] + "..."
                            print(f"[Tool Output ({msg.name})]: {content_preview}")

    except Exception as e:
        print(f"❌ 运行时错误: {e}")
        import traceback
        traceback.print_exc()
    finally:
        # 6. 清理资源
        print("\n正在关闭沙箱...")
        backend.close()
        print("沙箱已关闭")
        print("演示结束")

if __name__ == "__main__":
    try:
        # asyncio.run(run_e2b_demo())
        await run_e2b_demo()
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            print("请在 Jupyter 中使用 await run_e2b_demo()")
        else:
            raise e

5、DockerBackend 使用Docker容器 链接到标题

DockerBackend 的核心作用是为 AI 智能体提供一个 安全沙箱(Sandbox) 。

如果不使用 Docker,Agent 执行的每一条命令(如 rm -rf 、 pip install )都会直接发生在您的宿主机(Mac)上,这极其危险且环境不可控。

  • 作用与优势

  • 安全隔离 (Security & Isolation)

    • 作用 :Agent 的所有操作(文件读写、代码执行、系统命令)都被限制在 Docker 容器内部。

    • 优势 :即使 Agent 产生幻觉执行了恶意代码(如删除系统文件),也只会破坏容器, 您的 Mac 宿主机毫发无损 。演示代码中的 auto_remove=True 确保任务结束后容器自动销毁,不留痕迹。

  • 环境一致性 (Reproducibility)

    • 作用 :代码指定了镜像 image=“python:3.11-slim” 。

    • 优势 :无论您的电脑安装的是 Python 3.9 还是 3.12,Agent 永远在一个干净、标准的 Python 3.11 环境中运行。这解决了“在我的机器上能跑”的经典依赖问题。

  • 生命周期管理 (Lifecycle Management)

    • 作用 : DockerBackend 自动处理容器的 启动 -> 连接 -> 执行 -> 销毁 全过程。

    • 优势 :开发者无需手动编写复杂的 Docker 命令,像使用本地对象一样简单地调用 backend.execute() 或 backend.write_file() 。

# 需要先安装一下docker依赖
#!pip install docker

#这个 Backend 会在本地启动一个 Docker 容器,并将会话隔离在容器内部。

# - 核心功能 :
#   - 自动生命周期管理 :初始化时启动容器,结束时自动销毁 ( auto_remove=True )。
#   - 高效文件传输 :使用 tar 流在宿主机和容器之间传输文件,支持批量操作。
#   - 资源限制 :支持设置 CPU ( cpu_quota ) 和内存 ( memory_limit ) 限制,防止 Agent 耗尽本机资源。
#   - 网络控制 :可选禁用网络 ( network_disabled=True ) 以增强安全性。

import io
import tarfile
import time
import uuid
from typing import Optional

# 加载 DeepAgents 后端协议
from deepagents.backends.protocol import (
    ExecuteResponse,
    FileDownloadResponse,
    FileUploadResponse,
    SandboxBackendProtocol,
)

# 加载 DeepAgents 基础沙箱类
from deepagents.backends.sandbox import BaseSandbox

try:
    import docker
    from docker.errors import NotFound, APIError
except ImportError:
    docker = None

class DockerBackend(BaseSandbox):
    """Docker 沙箱后端实现,用于 DeepAgents。

    该后端使用本地 Docker 守护进程提供隔离的执行环境。
    需要安装 `docker` Python 包,并确保 Docker 守护进程正在运行。
    """

    def __init__(
        self,
        image: str = "python:3.11-slim",
        auto_remove: bool = True,
        cpu_quota: int = 50000,  # 50% CPU
        memory_limit: str = "512m",
        network_disabled: bool = False,
        working_dir: str = "/workspace",
        volumes: dict[str, dict[str, str]] | None = None,
    ) -> None:
        """初始化 Docker 沙箱。

        参数:
            image: 使用的 Docker 镜像(默认:"python:3.11-slim")
            auto_remove: 是否在关闭时移除容器(默认:True)
            cpu_quota: CPU 配额,单位为微秒(默认:50000)
            memory_limit: 内存限制(默认:"512m")
            network_disabled: 是否禁用网络访问(默认:False)
            working_dir: 容器内的工作目录(默认:"/workspace")
            volumes: Docker 卷配置,例如 {'/宿主机路径': {'bind': '/容器路径', 'mode': 'rw'}}
        """
        if docker is None:
            raise ImportError(
                "docker package is not installed. "
                "Please install it with `pip install docker`."
            )
        # 初始化 Docker 客户端,from_env() 会自动从环境变量中读取 Docker 配置
        self.client = docker.from_env()
        self.image = image
        self.auto_remove = auto_remove
        self.working_dir = working_dir
        self.volumes = volumes or {}
        self._container = None

        # Start container
        try:
            # Ensure image exists
            try:
                self.client.images.get(image)
            except NotFound:
                print(f"Pulling image {image}...")
                self.client.images.pull(image)

            self._container = self.client.containers.run(
                image,
                command="tail -f /dev/null",  # Keep container running
                detach=True,
                tty=True,
                cpu_quota=cpu_quota,
                mem_limit=memory_limit,
                network_disabled=network_disabled,
                working_dir=working_dir,
                volumes=self.volumes,
            )

            # Ensure working directory exists
            self.execute(f"mkdir -p {working_dir}")

        except Exception as e:
            raise RuntimeError(f"Failed to start Docker container: {e}")

    @property
    def id(self) -> str:
        """Unique identifier for the sandbox backend."""
        return self._container.id if self._container else "unknown"

    def execute(self, command: str) -> ExecuteResponse:
        """Execute a command in the sandbox."""
        if not self._container:
            return ExecuteResponse(
                output="Container not running",
                exit_code=1,
                truncated=False
            )

        try:
            # Docker exec_run 返回 (exit_code, output)
            # output 是字节类型
            # 使用列表形式的 cmd 以避免 shell 转义问题
            exit_code, output = self._container.exec_run(
                cmd=["bash", "-c", command],
                workdir=self.working_dir,
                demux=False # Combine stdout and stderr
            )

            return ExecuteResponse(
                output=output.decode("utf-8", errors="replace"),
                exit_code=exit_code,
                truncated=False,
            )
        except Exception as e:
            return ExecuteResponse(
                output=f"Error executing command: {str(e)}",
                exit_code=1,
                truncated=False,
            )

    def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
        """Upload multiple files to the sandbox using tar archive."""
        if not self._container:
            return [FileUploadResponse(path=p, error="permission_denied") for p, _ in files]

        responses = []

        # Create a tar archive in memory
        tar_stream = io.BytesIO()
        with tarfile.open(fileobj=tar_stream, mode='w') as tar:
            for path, content in files:
                # Docker put_archive expects relative paths inside the tar to be relative to the destination
                # But here we want absolute paths to be respected.
                # Actually put_archive extracts to a directory.
                # To support absolute paths, we should probably upload to root /?
                # Or handle relative paths relative to working_dir.

                # Let's handle paths:
                # If path is absolute, we strip leading / and upload to root.
                # If path is relative, we upload to working_dir.

                # Simplification: We will create a tar with full structure and extract to /

                # Normalize path
                if path.startswith("/"):
                    arcname = path.lstrip("/")
                    dest_path = "/"
                else:
                    arcname = path
                    dest_path = self.working_dir

                info = tarfile.TarInfo(name=arcname)
                info.size = len(content)
                info.mtime = time.time()
                tar.addfile(info, io.BytesIO(content))

                responses.append(FileUploadResponse(path=path, error=None))

        tar_stream.seek(0)

        try:
            # We extract to / to support absolute paths in the tar
            # Note: This assumes all files in the batch can be extracted to the same root.
            # If mixed absolute/relative, this might be tricky.
            # For robustness, we might need to upload one by one if paths are mixed,
            # or group them.
            # Strategy: Always extract to / (root), and ensure arcnames are full paths (without leading /)

            self._container.put_archive(
                path="/",
                data=tar_stream
            )
        except Exception as e:
            # Mark all as failed if batch fails
            return [FileUploadResponse(path=p, error="permission_denied") for p, _ in files]

        return responses

    def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
        """Download multiple files from the sandbox."""
        if not self._container:
            return [FileDownloadResponse(path=p, error="permission_denied") for p in paths]

        responses = []
        for path in paths:
            try:
                # get_archive returns a tuple (generator, stat)
                bits, stat = self._container.get_archive(path)

                # Reconstruct tar from bits
                file_content = io.BytesIO()
                for chunk in bits:
                    file_content.write(chunk)
                file_content.seek(0)

                # Extract file from tar
                with tarfile.open(fileobj=file_content, mode='r') as tar:
                    # There should be only one file/dir
                    member = tar.next()
                    if member.isdir():
                        responses.append(FileDownloadResponse(path=path, error="is_directory"))
                        continue

                    f = tar.extractfile(member)
                    if f:
                        content = f.read()
                        responses.append(FileDownloadResponse(path=path, content=content, error=None))
                    else:
                        responses.append(FileDownloadResponse(path=path, error="file_not_found"))

            except NotFound:
                responses.append(FileDownloadResponse(path=path, error="file_not_found"))
            except Exception as e:
                error_msg = str(e).lower()
                error = "invalid_path"
                if "permission" in error_msg:
                    error = "permission_denied"

                responses.append(FileDownloadResponse(path=path, content=None, error=error))
        return responses

    def close(self):
        """Close the sandbox session."""
        if self._container:
            try:
                if self.auto_remove:
                    self._container.remove(force=True)
                else:
                    self._container.stop()
            except Exception:
                pass
            self._container = None
  • 使用自定义的DockerBackend实现沙箱环境
import asyncio
import os
import sys
from dotenv import load_dotenv
from deepagents import create_deep_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import BaseMessage

# 尝试导入 DockerBackend
try:
    from docker_backend import DockerBackend
except ImportError:
    sys.path.append(os.path.dirname(os.path.abspath(__file__)))
    from docker_backend import DockerBackend

load_dotenv(override=True)

async def run_docker_demo():
    print("\n" + "="*80)
    print("DeepAgents DockerBackend 演示 (极简版)")
    print("="*80)

    # 1. 检查 Docker 环境
    try:
        import docker
        docker.from_env().ping()
    except Exception as e:
        print(f"❌ Docker 未运行或未安装: {e}")
        print("请确保 Docker Desktop 已启动。")
        return

    # 2. 初始化 Docker Backend
    print("正在启动 Docker 容器 (Image: python:3.11-slim)...")
    try:
        backend = DockerBackend(
            image="python:3.11-slim",
            auto_remove=True
        )
        print(f"容器已启动 (ID: {backend.id[:12]})")
    except Exception as e:
        print(f"❌ 启动失败: {e}")
        return

    try:
        # 3. 创建 Agent
        llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

        agent = create_deep_agent(
            model=llm,
            backend=backend,
            system_prompt="""你是一个运行在 Docker 容器中的 AI 助手。
            你的任务是演示环境隔离性。

            请执行以下步骤:
            1. 运行 'cat /etc/os-release' 查看容器操作系统。
            2. 运行 'python --version' 确认 Python 环境。
            3. 创建文件 '/workspace/hello_docker.py',内容为打印 'Hello from Docker Container!'。
            4. 运行该脚本。
            """
        )

        print(f"\n任务开始")

        # 4. 执行任务
        async for event in agent.astream({"messages": [("user", "请开始演示。")]}):
            for node_name, node_data in event.items():
                if not node_data:
                    continue

                # 处理 Overwrite 对象
                if hasattr(node_data, "value"):
                    node_data = node_data.value

                # 再次检查是否为字典
                if not isinstance(node_data, dict):
                    continue

                if "messages" in node_data:
                    messages = node_data["messages"]
                    if hasattr(messages, "value"):
                        messages = messages.value

                    if isinstance(messages, list) and messages:
                        last_msg = messages[-1]
                        if isinstance(last_msg, BaseMessage) and last_msg.content:
                            print(f"\n[Agent ({node_name})]")
                            print("-" * 40)
                            print(last_msg.content)
                            print("-" * 40)

    except Exception as e:
        print(f"❌ 运行时错误: {e}")
        import traceback
        traceback.print_exc()
    finally:
        print("\n正在清理容器...")
        backend.close()
        print("容器已移除")
        print("\n演示结束")

if __name__ == "__main__":
    await run_docker_demo()
  • 这里可以看一下DockerBackend和E2BBackend的区别
特性DockerBackendE2BBackend
核心定位本地轻量级容器化沙箱云端安全沙箱环境 (SaaS)
部署位置运行在本地机器 (Localhost)运行在 E2B 云端集群 (Remote Cloud)
依赖环境需要本地安装并运行 Docker Desktop/Daemon仅需安装 e2b Python SDK,无需本地 Docker
资源消耗消耗本地 CPU/内存资源消耗 E2B 云端资源 (不占用本地算力)
启动速度快 (本地镜像启动,毫秒-秒级)较快 (云端冷启动约 1-3秒)
网络隔离可配置 (支持完全离线 network_disabled=True)默认联网 (支持访问公网 API)
持久化支持挂载本地卷 (Volumes) 实现数据持久化临时环境 (会话结束即销毁),数据需手动导出
适用场景• 本地开发/调试• 数据隐私敏感 (不想数据出本地)• 离线环境使用• 生产环境部署 (无需维护 Docker)• 多租户隔离 (每个用户一个云沙箱)• 本地资源受限设备
成本免费 (使用自有硬件)付费 (按使用时长/资源计费)
配置复杂度中 (需管理镜像、卷挂载、Docker 进程)低 (API Key 开箱即用)

6、StoreBackend 使用数据库存储 链接到标题

  • step1 : 这里我们使用postgresql数据库进行数据的存储,所以需要先系统安装postgresql在本地环境

  • step2 : 安装完成之后,可以使用Docker来部署启动postgresql数据库,我这里使用的docker-compose.yml文件启动的,需要把password密码,user用户名,database数据库名称修改为自己的,使用docker-compose up -d启动数据库,如下所示:

#docker-compose.yml 文件:

services:
  postgres:
    image: postgres:15
    container_name: my-postgres
    environment:
      - POSTGRES_PASSWORD=123456
      - POSTGRES_USER=myuser
      - POSTGRES_DB=mydatabase
    ports:
      - "5432:5432"
    volumes:
      - pg_data:/data/db
      - ./conf.d:/data/conf.d
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    restart: unless-stopped

volumes:
  pg_data:
    driver: local
#!pip install langgraph-checkpoint-postgres  # 生产环境使用
# 测试数据库是否连接正常,-U 指定用户名,-d 指定数据库名称
!psql -U myuser -d mydatabase -c "SELECT version();"
import asyncio
import os
import uuid
import traceback
from dotenv import load_dotenv

# LangChain / LangGraph Imports
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_core.messages import BaseMessage, ToolMessage
from langgraph.store.postgres import PostgresStore
from langgraph.checkpoint.memory import MemorySaver
from psycopg_pool import ConnectionPool

# DeepAgents Imports
from deepagents import create_deep_agent
from deepagents.backends import StoreBackend

# 加载环境变量
load_dotenv(override=True)

# 数据库连接字符串 (请根据实际情况修改)
DB_URI = "postgresql://myuser:123456@localhost:5432/mydatabase"

def print_header():
    print("\n" + "="*80)
    print("DeepAgents StoreBackend (PostgreSQL) 演示 (极简版)")
    print("="*80)

async def setup_mcp_tools():
    """
    连接 Context7 MCP 服务器并获取工具。
    """
    print("Step 1.1: 正在连接 Context7 MCP 服务器...")
    try:
        # 使用官方 Context7 MCP 配置
        client = MultiServerMCPClient({
            "context7": {
                "transport": "stdio",
                "command": "npx",
                "args": ["-y", "@upstash/context7-mcp@latest"],
            }
        })

        # 获取工具列表
        tools = await client.get_tools()

        print(f"Step 1.2: 成功加载 {len(tools)} 个 MCP 工具")
        return client, tools
    except Exception as e:
        print(f"ERROR: 连接 MCP 失败: {e}")
        return None, []


async def run_store_backend_demo():
    """
    运行 StoreBackend 演示:
    展示如何使用 PostgreSQL 作为 DeepAgents 的文件系统后端。
    """
    print_header()

    # =================================================================================================
    # Step 1. 初始化 MCP 工具 (Connect to Context7 MCP Server)
    # =================================================================================================
    mcp_client, mcp_tools = await setup_mcp_tools()


    # =================================================================================================
    # Step 2. 初始化 PostgreSQL 连接池和存储组件 (Init DB & Store)
    # =================================================================================================
    print(f"\nStep 2: 连接数据库 {DB_URI} 并初始化存储...")

    # 使用 ConnectionPool 管理数据库连接
    # StoreBackend 需要同步的 ConnectionPool (因为它在线程中运行同步操作)
    try:
        with ConnectionPool(conninfo=DB_URI, kwargs={"autocommit": True}) as pool:

            # 2.1 初始化 Checkpointer (用于保存会话状态/聊天记录)
            # 注意: 这里使用 MemorySaver 是为了避开 langgraph-checkpoint-postgres 在某些环境下的异步兼容性问题
            checkpointer = MemorySaver()

            # 2.2 初始化 Store (用于保存文件/长期记忆)
            # DeepAgents 的 StoreBackend 会将文件系统操作映射到这个 PostgresStore
            store = PostgresStore(pool)

            # 2.3 执行数据库迁移 (首次运行需要初始化表结构)
            # 这会在数据库中创建 'store' 表,用于存储 JSON 数据 (即我们的文件)
            with pool.connection() as conn:
                with conn.cursor() as cur:
                    for migration in store.MIGRATIONS:
                        cur.execute(migration)

            print("Step 2: 数据库表结构就绪")


            # =================================================================================================
            # Step 3. 创建基于 StoreBackend 的 Agent (Create Agent)
            # =================================================================================================

            # StoreBackend 将文件系统操作映射到 LangGraph 的 Store
            # 这意味着文件将持久化存储在 PostgreSQL 数据库中,跨重启可用
            llm = ChatOpenAI(model="gpt-4o", temperature=0)

            # 定义 Backend Factory
            # 这是一个工厂函数,用于在运行时(runtime)把 StoreBackend 实例化。
            # create_deep_agent 会在内部把 LangGraph 的 runtime 对象作为参数 rt 传进来,
            # 这样 StoreBackend 就能拿到 store/checkpointer 等资源,实现文件系统到 PostgreSQL 的映射。
            backend_factory = lambda rt: StoreBackend(rt)
            print("Step 3: 正在初始化 DeepAgent (Backend: StoreBackend -> PostgresStore)...")

            agent = create_deep_agent(
                model=llm,
                tools=mcp_tools,
                backend=backend_factory,
                store=store,              # 关键: 必须传入 store 实例
                checkpointer=checkpointer, # 关键: 传入 checkpointer 实例
                system_prompt="""你是一个高级技术助手。
                你的任务是使用 Context7 工具查询关于 'DeepAgents StoreBackend' 的用法。
                查询后,创建一个总结文件 '/knowledge/store_backend_notes.md',并写入关键信息。
                由于你使用的是 StoreBackend,这个文件将直接存储在 PostgreSQL 数据库中。
                最后,请读取该文件以验证存储成功。"""
            )


            # =================================================================================================
            # Step 4. 执行任务 (Execute Task)
            # =================================================================================================
            thread_id = str(uuid.uuid4())
            config = {"configurable": {"thread_id": thread_id}}

            task = "请查询 StoreBackend 的用法,并将总结写入 /knowledge/store_backend_notes.md,最后读取它验证。"

            # --- UI Display ------------------------------------------------------------------------------
            print(f"Session Thread ID: {thread_id}")
            print("\n" + "-"*40)
            print(f"Step 4: 开始执行任务: {task}")
            print("-"*40)
            # ---------------------------------------------------------------------------------------------

            step = 0
            message_history_len = 0

            try:
                # 使用 astream 观察执行过程
                async for event in agent.astream({"messages": [("user", task)]}, config=config):
                    if "messages" in event:
                        current_messages = event["messages"]
                        if len(current_messages) > message_history_len:
                            for i in range(message_history_len, len(current_messages)):
                                msg = current_messages[i]

                                # 1. 显示普通文本消息 (Agent 回复)
                                if isinstance(msg, BaseMessage) and msg.content:
                                    is_tool_call = getattr(msg, "tool_calls", None)
                                    if not is_tool_call:
                                        print(f"\n[Agent]:\n{msg.content}")

                                # 2. 显示工具调用 (Agent 请求工具)
                                if hasattr(msg, "tool_calls") and msg.tool_calls:
                                    step += 1
                                    print(f"\n[Step {step}: 工具调用]:")
                                    for tc in msg.tool_calls:
                                        print(f"  • 工具: {tc['name']}")
                                        print(f"  • 参数: {tc['args']}")

                                # 3. 显示工具输出 (Tool 执行结果)
                                if isinstance(msg, ToolMessage):
                                    content_preview = msg.content[:200] + "..." if len(msg.content) > 200 else msg.content
                                    print(f"\n[Tool Output ({msg.name})]: {content_preview}")

                            message_history_len = len(current_messages)

            except Exception as e:
                print(f"❌ 运行时错误: {e}")
                traceback.print_exc()


            # =================================================================================================
            # Step 5. 验证持久化存储 (Verify Persistence)
            # =================================================================================================
            print("\n" + "="*50)
            print("Step 5: 验证 StoreBackend (Postgres) 持久化")
            print("="*50)

            print("\n验证操作: 使用新 Agent 实例 (模拟重启) 读取同一文件...")

            # 模拟重启:创建一个新的 Agent 实例,但连接同一个 Database Store
            # 只要 DB 连接不变,文件系统状态就是持久的
            verify_agent = create_deep_agent(
                model=llm,
                backend=backend_factory,
                store=store,
                checkpointer=checkpointer,
                system_prompt="验证助手"
            )

            # 读取之前创建的文件
            verify_result = await verify_agent.ainvoke({
                "messages": [("user", "请读取 /knowledge/store_backend_notes.md 的内容")]
            }, config=config)

            last_msg = verify_result["messages"][-1]

            # --- UI Display ------------------------------------------------------------------------------
            print("\n[数据库读取验证结果]:")
            print("-" * 20)
            print(last_msg.content)
            print("-" * 20)
            print("\n说明: 'prefix' 对应 StoreBackend 的 namespace (默认为 'filesystem'),'key' 对应文件绝对路径。")

            # SQL 查询提示
            print("\n提示: 你可以使用以下 SQL 在数据库中直接查询此文件:")
            sql_query = """
                        SELECT
                            key,
                            value->>'content' as content,
                            updated_at
                        FROM store
                        WHERE prefix = 'filesystem'
                        AND key = '/knowledge/store_backend_notes.md';
                                """
            print(sql_query.strip())
            # ---------------------------------------------------------------------------------------------

            print("\n演示结束")

    except Exception as e:
        print(f"❌ 数据库连接或初始化失败: {e}")
        print("请检查 PostgreSQL 是否正在运行,以及 DB_URI 是否正确。")


if __name__ == "__main__":
    # 确保 asyncio.run 兼容 Jupyter 环境
    try:
        # asyncio.run(run_store_backend_demo())
        await run_store_backend_demo()
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            print("请在 Jupyter 中使用 await run_store_backend_demo()")
        else:
            raise e
# 查看存入数据库中的结果
!psql -U myuser -d mydatabase -c "SELECT key, value->>'content' as content,updated_at FROM store WHERE prefix = 'filesystem' AND key = '/knowledge/store_backend_notes.md';"
  • 这里看一下Backend vs Checkpointer vs Store 这三者的区别
参数组件Backend (后端)Checkpointer (检查点)Store (存储)
核心定义环境层 (Environment)状态层 (State / Short-term Memory)记忆层 (Memory / Long-term Memory)
负责什么?“外部世界” 的交互能力。即:文件存在哪?代码在哪跑?“当前对话” 的上下文。即:刚才说了什么?现在运行到哪一步了?“跨会话” 的知识积累。即:用户叫什么名字?上次任务学到了什么?
数据类型非结构化文件 (.py, .md, .txt)运行时环境 (Shell, Process)BaseMessage 列表 (User/AI/Tool Message)Graph 节点状态结构化 JSON 数据 (Key-Value)用户偏好、长期笔记
典型实现DockerBackend (容器)FilesystemBackend (磁盘)E2BBackend (云沙箱)MemorySaver (内存)PostgresSaver (数据库)SqliteSaver (本地DB)InMemoryStore (内存)PostgresStore (数据库)
生命周期任务级(任务结束容器可能销毁)线程级 (Thread)(换个 thread_id 就没了)全局级 (Global)(所有 thread 都能查到)
形象比喻工作台 / 电脑大脑的工作记忆 (只会死记硬背当前对话)日记本 / 知识库 (记录永久信息)

7、CompositeBackend 使用混合模式 链接到标题

CompositeBackend是 Agent 文件操作的“智能路由器” 。

在单一后端模式下,Agent 所有的文件操作(读、写、列出目录)都只能去往同一个地方(要么全是本地磁盘,要么全是 Docker 容器内)。而 CompositeBackend 允许你根据 文件路径前缀 ,将请求分发给不同的后端。

  • 核心优势
  1. 性能与开销优化 (Performance) 这是最关键的技术优势。

    • DockerBackend 的局限 : 向 Docker 容器内读写文件(尤其是大文件)需要经过 Docker Daemon 的 API (如 put_archive / get_archive ),涉及网络通信和打包解包,开销较大。

    • CompositeBackend 的解法 : 对于数据文件( /data ),直接通过 FilesystemBackend 进行本地 I/O 操作, 完全绕过了 Docker API ,读写速度是操作系统原生的速度。

  2. 计算与存储分离 (Decoupling)

    • 计算是临时的 : 你的 processor.py 脚本可能只需要运行一次,运行环境(Python 依赖)可能很复杂且容易冲突。放在 Docker 里最合适。

    • 数据是永恒的 : 你的 raw_metrics.txt 和 health_report.txt 是业务资产。通过路由直接落盘到宿主机,即使 Docker 容器崩溃、被删除或重启, 数据毫发无损 且立即可在宿主机访问(如代码 Line 212-222 所示的验证步骤)。

  3. 给予 Agent “混合云” 的能力,Agent 可以像人类工程师一样工作:

    • “我在临时的沙箱里写代码测试(Docker)。”

    • “测试好了,我把结果保存到公司的共享网盘里(Filesystem/Mount)。”

import asyncio
import shutil
import os
import time
from pathlib import Path
from dotenv import load_dotenv

# DeepAgents 导入
from deepagents import create_deep_agent
from deepagents.backends.composite import CompositeBackend
from deepagents.backends.filesystem import FilesystemBackend
from langchain_openai import ChatOpenAI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_core.messages import BaseMessage, ToolMessage

# 导入 DockerBackend
try:
    from docker_backend import DockerBackend
except ImportError:
    try:
        from deepagents.backends.docker import DockerBackend
    except ImportError:
        DockerBackend = None

def print_header():
    print("\n" + "="*80)
    print("DeepAgents CompositeBackend 混合后端演示 (极简版)")
    print("架构:混合云原生模式 (Docker 执行 + 本地持久化)")
    print("="*80)

async def setup_mcp_tools():
    print("  → 正在连接 Context7 MCP 服务器...")
    try:
        client = MultiServerMCPClient({
            "context7": {
                "transport": "stdio",
                "command": "npx",
                "args": ["-y", "@upstash/context7-mcp@latest"],
            }
        })
        tools = await client.get_tools()
        print("  → MCP 工具加载成功")
        return client, tools
    except Exception as e:
        print(f"ERROR: MCP 连接失败: {e}")
        return None, []

async def run_composite_demo():
    load_dotenv(override=True)
    print_header()

    if DockerBackend is None:
        print("严重错误: 未找到 DockerBackend。请确保 docker_backend.py 存在。")
        return

    # Step 1
    print("\n" + "-"*40)
    print("步骤 1: 配置混合环境")
    print("-"*40)

    host_work_dir = Path("workspace/data_analysis_project").resolve()
    if host_work_dir.exists():
        shutil.rmtree(host_work_dir)
    host_work_dir.mkdir(parents=True, exist_ok=True)
    print(f"  • 宿主机持久层: {host_work_dir}")

    container_mount_path = "/data"
    docker_volumes = {
        str(host_work_dir): {'bind': container_mount_path, 'mode': 'rw'}
    }
    print(f"  • 容器挂载:      {host_work_dir}{container_mount_path}")

    # Step 2
    print("\n" + "-"*40)
    print("步骤 2: 初始化混合后端 (Composite Backend)")
    print("-"*40)

    fs_backend = FilesystemBackend(root_dir=host_work_dir, virtual_mode=True)

    print("  • 正在启动 Docker 容器 (python:3.11-slim)...")
    docker_backend = DockerBackend(
        image="python:3.11-slim",
        auto_remove=True,
        volumes=docker_volumes
    )

    routes = {
        container_mount_path: fs_backend
    }
    backend = CompositeBackend(default=docker_backend, routes=routes)

    print("\n[路由表配置]")
    print(f"1. 默认路由 (/): DockerBackend (临时执行)")
    print(f"2. 持久化路由 ({container_mount_path}/*): FilesystemBackend (宿主机存储)")

    # Step 3
    print("\n" + "-"*40)
    print("步骤 3: 部署 Agent")
    print("-"*40)

    mcp_client, mcp_tools = await setup_mcp_tools()

    system_prompt = f"""你是一名在混合环境中工作的高级数据工程师。

    环境地图:
    1. 执行层 (根目录 `/`):
       - 临时的 Docker 容器。
       - 用于创建脚本 (`.py`) 和运行命令。
       - 这里的文会在会话结束后消失。

    2. 存储层 (`{container_mount_path}`):
       - 从宿主机挂载的持久化存储。
       - 用于存放 输入 数据和 输出 报告。
       - 这里的文件会永久保存。

    你的任务:
    1. **摄入**: 创建一个文件 `{container_mount_path}/raw_metrics.txt`,内容为 "CPU: 45%, Mem: 60%"。
       (注意: 这使用了 'write_file' 工具,该工具通过路由直接写入宿主机文件系统)。

    2. **处理**: 创建一个 Python 脚本 `/processor.py` (在根目录),该脚本:
       - 读取 `{container_mount_path}/raw_metrics.txt`。
       - 计算 "健康分数" (模拟一下即可)。
       - 将报告写入 `{container_mount_path}/health_report.txt`。
       - 打印 "Analysis Complete"。

    3. **执行**: 使用 `python /processor.py` 运行脚本。
       (注意: 这在 Docker 内部运行。Docker 因为卷挂载能看到这些文件)。

    4. **验证**: 读取 `{container_mount_path}/health_report.txt` 并显示它。
    """

    agent = create_deep_agent(
        model=ChatOpenAI(model="gpt-4o", temperature=0),
        tools=mcp_tools,
        backend=backend,
        system_prompt=system_prompt
    )

    # Step 4
    print("\n" + "-"*40)
    print("步骤 4: 任务执行")
    print("-"*40)

    task_input = "开始工程流水线。"
    config = {"configurable": {"thread_id": "composite_demo_simple_v1"}}

    step_count = 0
    try:
        message_history_len = 0

        async for event in agent.astream({"messages": [("user", task_input)]}, config=config):
            if "messages" in event:
                current_messages = event["messages"]
                if len(current_messages) > message_history_len:
                    for i in range(message_history_len, len(current_messages)):
                        msg = current_messages[i]

                        # Agent Thinking
                        if isinstance(msg, BaseMessage) and msg.content and not getattr(msg, "tool_calls", None):
                            step_count += 1
                            print(f"\n[🧠 Agent 思考 (步骤 {step_count})]:\n{msg.content}")

                        # Tool Calls
                        if hasattr(msg, "tool_calls") and msg.tool_calls:
                            step_count += 1
                            for tc in msg.tool_calls:
                                tool_name = tc['name']
                                args = tc['args']

                                # Routing logic visualization
                                target = "Docker 容器 🐳"
                                path_arg = args.get('file_path') or args.get('path')
                                if path_arg and str(path_arg).startswith(container_mount_path):
                                    target = "宿主机文件系统 💾"

                                print(f"\n[🛠️ 工具执行 (步骤 {step_count})]:")
                                print(f"  • 工具: {tool_name}")

                                # Special handling for code content
                                if tool_name == "write_file" and path_arg and str(path_arg).endswith(".py"):
                                    code_content = args.get("content", "")
                                    # Print args without content first
                                    args_copy = args.copy()
                                    args_copy['content'] = "(代码内容如下...)"
                                    print(f"  • 参数: {args_copy}")
                                    print(f"  • 路由: → {target}")
                                    print(f"  • 📝 写入代码内容:\n")
                                    print("-" * 20)
                                    print(code_content)
                                    print("-" * 20)
                                else:
                                    print(f"  • 参数: {str(args)[:200] + '...' if len(str(args)) > 200 else args}")
                                    print(f"  • 路由: → {target}")

                        # Tool Outputs
                        if isinstance(msg, ToolMessage):
                            content = msg.content
                            if len(content) > 300:
                                content = content[:300] + "... [已截断]"
                            print(f"\n[↳ 输出]: {content}")

                    message_history_len = len(current_messages)

    except Exception as e:
        print(f"\n运行时错误: {e}")

    # Step 5
    print("\n" + "-"*40)
    print("步骤 5: 宿主机侧验证")
    print("-"*40)

    report_path = host_work_dir / "health_report.txt"
    raw_path = host_work_dir / "raw_metrics.txt"

    if raw_path.exists():
        print(f"✅ 原始数据已找到: {raw_path} (通过直接 FS 路由创建)")
    else:
        print(f"❌ 原始数据丢失: {raw_path}")

    if report_path.exists():
        content = report_path.read_text()
        print(f"\n🏆 持久化验证成功! 文件: {report_path}")
        print("内容:")
        print("-" * 20)
        print(content)
        print("-" * 20)
    else:
        print(f"❌ 报告丢失: {report_path}")

    # Step 6
    print("\n正在关闭基础设施...")
    if 'docker_backend' in locals() and hasattr(docker_backend, "close"):
        docker_backend.close()
        print("  • Docker 容器已终止")

    print("\n✨ 演示圆满完成!")

if __name__ == "__main__":
    try:
        # asyncio.run(run_composite_demo())
        await run_composite_demo()
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            print("检测到正在运行的事件循环。请在单元格中使用 'await run_composite_demo()'。")
        else:
            raise e

三、interrupt_on 参数 链接到标题

Human-in-the-loop(HITL) 人工干预 链接到标题

  1. Human-in-the-loop (HITL):对于 write_filedelegate_task 等关键操作,利用 LangGraph 的中断机制加入人工审批。

  2. 异步运行:DeepAgent 的任务通常耗时较长,务必使用异步 Webhooks 接收结果。

  3. 监控与调试:强烈建议结合 LangSmith 使用。由于 DeepAgent 内部有复杂的子 Agent 递归调用,使用 LangSmith 的 Tracing 功能是排查问题的有效手段。

  4. 后端挂载:在生产环境中,建议将 VFS 挂载到云端存储(如 S3),以防止容器重启导致 Agent 的"记忆"丢失。

  5. interrupt_on:这个参数其实是一个HITL的开关,就是把HITL的中间件插入到DeepAgent的执行流程中,当DeepAgent执行到需要人工审批的操作时,就会中断执行,等待人工审批。

    • 类型 : dict[str, bool | InterruptOnConfig]

    • 作用 : 映射“工具名称”到“中断配置”。

    • 示例 : interrupt_on={“write_file”: True} 表示当 Agent 试图调用 write_file 工具时,程序会暂停(Suspend),等待人工(Human)在 LangGraph 层面进行 Approve 、 Reject 或 Edit 操作后才能继续。

import os
import asyncio
from typing import Optional, Set
from langchain_openai import ChatOpenAI
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langgraph.checkpoint.memory import InMemorySaver
from dotenv import load_dotenv
from langchain_tavily import TavilySearch
from langchain.agents.middleware.human_in_the_loop import (
    HITLResponse,
    ApproveDecision,
    EditDecision,
    RejectDecision
)
from langgraph.types import Command
from langchain_core.messages import BaseMessage, ToolMessage, AIMessage

load_dotenv(override=True)

async def run_interrupt_test():
    """
    示例 1: 基础中断功能 (封装版)
    在工具调用前中断,让用户确认是否继续执行
    """
    print("\n" + "="*80)
    print("📚 示例 1: interrupt_on 使用")
    print("="*80)
    print("\n功能:在工具调用前暂停,等待人工确认\n")

    # 创建 LLM 和工具
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    search_tool = TavilySearch(max_results=2)

    # 创建 Agent,设置在 "tools" 节点中断
    agent = create_deep_agent(
        model=llm,
        tools=[search_tool],
        backend=FilesystemBackend(root_dir="./workspace",virtual_mode=True),
        checkpointer=InMemorySaver(),  # 必需!用于支持中断和恢复
        interrupt_on={"tavily_search": True},  # 在特定工具调用时中断
    )

    # 定义任务
    task = "搜索 'Python 异步编程' 的最新信息,并创建一个总结文件"

    # 配置会话 ID
    # 为了避免之前的状态干扰,我们使用一个新的 thread_id
    config = {"configurable": {"thread_id": "demo_basic_refactored_v1"}}

    print(f"📋 任务: {task}\n")
    print("🚀 开始执行...\n")

    # 追踪已打印的消息数量,避免重复打印
    message_history_len = 0

    # --- 第一次执行 ---
    print("【第一次执行 - 预期会中断】")
    async for event in agent.astream({"messages": [("user", task)]}, config=config):
        if "messages" in event:
            current_messages = event["messages"]
            if len(current_messages) > message_history_len:
                # 打印新增的消息
                for i in range(message_history_len, len(current_messages)):
                    msg = current_messages[i]
                    if msg.type == "ai":
                        if hasattr(msg, 'tool_calls') and msg.tool_calls:
                            print(f"🔧 AI 决定调用工具: {msg.tool_calls[0]['name']}")
                            print(f"   参数: {msg.tool_calls[0]['args']}")
                        elif msg.content:
                            print(f"💬 AI: {msg.content}")
                    elif msg.type == "tool":
                        print(f"✅ 工具输出: {msg.content[:100]}..." if len(msg.content) > 100 else f"✅ 工具输出: {msg.content}")

                message_history_len = len(current_messages)

    # 检查是否中断
    # 使用 aget_state (async) 获取状态
    state = await agent.aget_state(config)
    print(f"\n⏸️  执行状态: {state.next}")

    if state.tasks:
        print(f"\n--- 🛑 执行已暂停 (HITL Middleware) ---")
        print(f"下一步骤 (Next): {state.next}")

        last_message = state.values["messages"][-1]

        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            tool_call = last_message.tool_calls[0]
            print(f"\n[待审批操作]:")
            print(f"  - 工具: {tool_call['name']}")
            print(f"  - 参数: {tool_call['args']}")

            # === 人工介入 ===
            approval = input("\n[管理员]: 是否批准执行此操作? (y/n/e[编辑]): ")

            if approval.lower() == 'y':
                print("\n[系统]: 操作已批准,继续执行...")

                hitl_response = HITLResponse(
                    decisions=[ApproveDecision(type="approve")]
                )

                # === 恢复执行 ===
                # 使用 Command(resume=...)
                async for event in agent.astream(
                    Command(resume=hitl_response),
                    config=config,
                    stream_mode="values"
                ):
                    if "messages" in event:
                        current_messages = event["messages"]
                        if len(current_messages) > message_history_len:
                            for i in range(message_history_len, len(current_messages)):
                                msg = current_messages[i]

                                # 优化打印逻辑,清晰展示 AI 回复
                                if msg.type == "tool":
                                    print(f"\n[工具输出]:\n{msg.content[:300]}..." if len(msg.content) > 300 else f"\n[工具输出]:\n{msg.content}")
                                elif msg.type == "ai":
                                    if msg.content:
                                        print(f"\n[AI 回复]:\n{msg.content}\n")
                                    elif msg.tool_calls:
                                        print(f"\n🔧 AI 决定调用工具: {msg.tool_calls[0]['name']}")
                                        print(f"   参数: {msg.tool_calls[0]['args']}")

                            message_history_len = len(current_messages)

            else:
                print("\n[系统]: 操作被拒绝或您选择了其他选项 (本演示仅处理 'y')。")
    else:
        print("流程已完成,没有触发中断。")
        if state.values.get("messages"):
            last_msg = state.values["messages"][-1]
            if last_msg.type == "ai" and last_msg.content:
                print(f"\n[最终回复]: {last_msg.content}")

if __name__ == "__main__":
    try:
        # asyncio.run(run_interrupt_test())
        await run_interrupt_test()
    except KeyboardInterrupt:
        print("\n程序已停止")

第五阶段、总结 链接到标题

DeepAgents 采用 中间件模式 来增强 Agent 能力,这些中间件在 create_deep_agent 时自动装配:

  • FilesystemMiddleware :

    • 提供 ls , read_file , write_file , edit_file , glob , grep , execute 等标准工具。

    • 亮点功能 : 大结果自动转存 。当工具(如搜索或爬虫)返回内容过长时,自动截断并保存到文件系统,仅返回文件路径给 LLM,极大节省 Token 并防止 Crash。

  • TodoListMiddleware :

    • 拦截 LLM 输出的 todo_list ,将其解析为结构化状态。

    • 支持任务的增删改查(CRUD)和状态流转(Pending -> Completed)。

  • SubAgentMiddleware :

    • 自动创建 task 工具。

    • 支持 General Purpose Agent (通用分身)和 Custom Agent (专家分身)。

    • 实现父子 Agent 间通过文件系统交换数据,无需序列化传递大量文本。

  • HumanInTheLoopMiddleware :

    • 通过 interrupt_on 参数配置。

    • 支持在特定工具调用前(如 write_file , execute )暂停,等待人工审批、修改或拒绝。

  • PatchToolCallsMiddleware :

    • 自动检测并修复 LLM 产生的“悬空”工具调用(Dangling Tool Calls),增强稳定性。

一句话总结 :DeepAgents 是 LangGraph 的 最佳实践参考实现 ,它将构建“类人智能体”所需的规划、记忆、工具使用等复杂逻辑,封装成了标准化的基础设施,让开发者专注于业务逻辑而非底层编排。