管道
DAG 模式将复杂目标分解为有向无环图 (DAG) 的步骤,以最大并行度执行它们,然后反思是否真正实现了目标。如果没有,它会重新规划并再次尝试 — 自主进行,直到达到可配置的预算。 管道有四个阶段形成一个循环: 规划。 智能 LLM 将富化查询分解为 2-6 个步骤,具有显式依赖边。每个步骤都获得任务描述、可选工具提示和模型提示,该提示控制它是在快速还是智能 LLM 上运行。 执行。 DAGExecutor 并行启动独立步骤(最多 5 个并发),遵守依赖图。每个步骤作为自包含的 ReAct 智能体运行,没有内存 — 它仅接收其任务描述和已完成依赖的结果。 分析。 PlanAnalyzer 评估执行的计划是否实现了原始目标,生成结构化的判决:achieved(布尔值)、confidence(0.0-1.0)、reasoning 和可选的 final_answer。
重新规划。 如果目标未实现且置信度低于停止阈值,管道循环回规划,使用重新规划上下文来总结发生了什么以及哪里出错了。此循环自主运行最多 DAG_MAX_REPLAN_ROUNDS 次。
两个 LLM 全程协作:智能 LLM 处理规划、分析和答案合成(需要高推理能力的任务),而快速 LLM 处理步骤执行和上下文压缩(成本和延迟比峰值推理更重要的任务)。每个结构化输出调用都使用 structured_llm_call,它提供 3 级降级链(Native FC、JSON Mode、带正则表达式回退的纯文本)来处理特定模型的输出怪癖。
LLM 调用映射
完整的 DAG 管道包含七个不同类别的 LLM 调用。理解每个调用发生的位置、由哪个模型处理以及失败时的处理方式对于调试和成本优化至关重要。| # | 调用位置 | 模块 | LLM 角色 | 格式 | 降级方案 |
|---|---|---|---|---|---|
| 1 | 历史摘要 | chat.py | 快速 LLM | 纯文本 | 截断最后 20K 字符 |
| 2 | DAGPlanner | planner.py | 智能 LLM | structured_llm_call | 3 级降级 |
| 3 | 工具选择 | react.py | 步骤 LLM | structured_llm_call | 返回所有工具 |
| 4 | ReAct 循环(每步) | react.py | 快速/智能 LLM | chat() | 重试/降级 |
| 5 | ContextGuard 压缩 | context_guard.py | 快速 LLM | 纯文本 | smart_truncate |
| 6 | PlanAnalyzer | analyzer.py | 智能 LLM | structured_llm_call | 正则表达式 + 默认值 |
| 7 | stream_synthesize | analyzer.py | 智能 LLM | stream_chat() | analysis.final_answer |
model_hint: null 的步骤可以通过模型注册表提升为智能 LLM。
DAGPlanner
规划器的工作是将高层目标转化为有效的 DAG,包含具体、可执行的步骤。它通过单次structured_llm_call 调用智能 LLM 来完成这项工作。
提示设计。 规划提示注入当前日期和年份(使 LLM 能够规划时间感知搜索),强制语言匹配(任务描述必须使用与目标相同的语言),并将步骤数限制在 2-6 之间。每个步骤有五个字段:id、task、dependencies、tool_hint 和 model_hint。提示明确反对拆分平凡相关的子任务——“如果多个检查可以在单个脚本中完成,将它们合并为一个步骤。”
结构化提取。 规划器使用 structured_llm_call 和 _PLAN_SCHEMA,定义 steps 数组架构和 parse_fn,将原始字典转换为 PlanStep 对象。如果 LLM 返回单个步骤对象而不是 {"steps": [...]} 包装器,解析器会自动恢复。ReAct 引擎 — structured_llm_call 中记录的 3 级降级链处理跨提供商的模型输出异常。
DAG 验证。 提取后,规划器使用 Kahn 算法进行拓扑排序来验证图结构。检查两个不变量:
- 无悬空引用。 如果步骤引用计划中不存在的依赖 ID,该引用将被静默删除并记录警告。这是一个恢复机制——LLM 有时会忽略它们引用的步骤,硬失败会浪费整个规划调用。
-
无循环。 如果 Kahn 算法无法访问所有节点(意味着至少存在一个循环),规划器会抛出
ValueError。循环是不可恢复的——循环计划无法执行。
"fast" 分配给它认为简单且确定性的步骤(数据查找、格式转换、直接检索),将 null 分配给需要更深层推理的步骤。执行器使用此提示为每个步骤选择适当的 LLM。如有疑问,提示指示 LLM 使用 null——使用更强大的模型总是更安全。
输入构造。 丰富的查询将对话历史与当前请求结合。如果对话很长,历史通过 DbMemory 加载并格式化为 "Previous conversation: ..."。当生成的丰富查询超过 16K 令牌(通过 CompactUtils.estimate_tokens 估计)时,它会使用 ContextGuard 中的 planner_input 提示通过 LLM 进行总结,然后传递给规划器。当没有可用的快速 LLM 时的备选方案:硬截断到最后 20K 字符。
DAGExecutor
执行器接收一个经过验证的ExecutionPlan 并并发运行其步骤,同时尊重依赖边并强制执行资源限制。
并发模型。 一个 asyncio.Semaphore 将并行步骤执行限制为 max_concurrency(默认为 5,可通过 MAX_CONCURRENCY 环境变量配置)。调度循环识别所有依赖项已完成的步骤,将它们作为 asyncio.Task 实例启动,并等待至少一个完成后再次检查。步骤按排序的 ID 顺序启动以确保确定性行为。
每步 ReAct 智能体。 每个步骤作为由 _resolve_agent() 创建的独立 ReAct 智能体运行。如果步骤有一个 model_hint 与 ModelRegistry 中的角色匹配,则创建一个具有相应 LLM 的临时智能体。否则,使用默认的快速 LLM 智能体。这些每步智能体没有记忆 — 它们从零开始,仅包含任务描述、原始目标、任何工具提示和已完成依赖的结果。这种隔离是有意的:DAG 步骤应该是自包含的工作单元,不会在图中泄漏状态。
依赖上下文注入。 _build_step_context() 将所有已完成依赖步骤的结果格式化为文本块:每个依赖的 ID、状态、任务描述和结果。如果配置了 ContextGuard 且组合上下文超过 max_message_chars,则使用 [Dependency context truncated] 后缀进行硬截断。这防止依赖于多个冗长前置步骤的步骤超出其自身的上下文窗口。
步骤超时。 每个步骤都用 asyncio.wait_for 包装,默认超时为 600 秒(10 分钟)。如果步骤超过此时间,它将被取消并标记为 "failed",并显示超时消息。超时是按步骤的,不是按计划的 — 一个 5 步计划如果步骤按顺序执行,理论上可以运行 50 分钟。
中断和取消。 执行器有两条不同的取消路径,每条由不同的事件触发:
优雅跳过 — 停止事件。 当用户在执行期间发送后续消息时,chat.py 中的编排器设置 exec_stop_event。执行器在每个调度周期的顶部检查此标志:如果设置,所有剩余的 pending 步骤立即标记为 "skipped",原因为 "Skipped — user changed requirements",循环退出。已运行的步骤允许完成 — 只有未启动的步骤被放弃。这种快速退出让管道能够围绕用户的更新意图重新规划,而无需等待完整的原始计划完成。
立即中止 — asyncio 取消。 当 HTTP 客户端断开连接时,chat.py 通过 asyncio.Task.cancel() 取消顶级 run_task。执行器捕获 asyncio.CancelledError,取消所有当前运行的步骤任务,通过 asyncio.gather(..., return_exceptions=True) 等待它们确认,然后重新抛出。客户端断开连接通过在 SSE 事件循环内每 0.5 秒轮询一次 await request.is_disconnected() 来检测。
语义差异很重要:停止事件意味着”跳过尚未启动的内容,但保留已运行的内容” — 已完成的步骤结果保持可用以通知重新规划。CancelledError 意味着”立即中止所有内容” — 所有进行中的工作被丢弃,无法恢复结果。
死锁检测。 如果调度循环发现没有任务运行且没有步骤准备启动(因为它们的依赖失败),所有剩余的待处理步骤都标记为 "failed",并显示解释其依赖从未完成的消息。这防止执行器无限期挂起。
进度回调。 执行器为三种事件类型触发 (step_id, event, data) 回调:"started"(步骤启动)、"iteration"(步骤内的工具调用)和 "completed"(步骤完成)。chat.py 中的 SSE 层将这些回调桥接到前端用于渲染实时 DAG 可视化的 step_progress 事件。
PlanAnalyzer
分析器评估执行的计划是否达成了原始目标。它生成一个结构化的AnalysisResult,包含四个字段:
achieved(布尔值) — 仅当目标完全实现时为true。confidence(浮点数,0.0-1.0) — 分析器对其评估的确定程度。相互矛盾的来源会降低此分数。final_answer(字符串或 null) — 当达成目标时的综合答案,否则为null。reasoning(字符串) — LLM 的思维链推理过程。
structured_llm_call 配合 _ANALYSIS_SCHEMA、处理类型强制转换和置信度限制的 parse_fn,以及用于格式错误 JSON 的 regex_fallback。正则表达式回退 (_regex_extract_analysis) 使用模式匹配从部分有效的 JSON 中提取 achieved、confidence、final_answer 和 reasoning 字段。这很重要,因为分析响应往往比规划响应更长更复杂,使得 JSON 格式错误更容易发生。
安全默认值。 如果所有提取级别都失败(原生 FC、JSON 模式、纯文本、正则表达式),分析器返回 AnalysisResult(achieved=False, confidence=0.0, reasoning="Could not parse analysis response")。这确保管道始终获得可用的结果——解析失败变成”未达成”的判决,触发重新规划而不是崩溃。
步骤结果格式化。 每个步骤的结果在分析提示中被截断为 10K 字符。这可以防止单个步骤的冗长输出(例如大型网页抓取或文件转储)主导分析器的上下文窗口并挤占其他步骤结果的空间。
多源比较。 分析提示包含一条指令,要求明确比较来自不同来源的结果。当网页搜索结果、知识库检索和文件操作都贡献数据时,分析器必须标记矛盾(不同的数字、日期、声明),并指出哪个来源可能更权威。矛盾会降低置信度分数,进而影响重新规划的决策。
重新规划
重新规划循环是 DAG 引擎最独特的特性:它可以通过反思出了什么问题并尝试不同的方法来自主恢复部分故障。 决策逻辑。 在每一轮计划-执行-分析之后,chat.py 中的编排器评估分析结果:
achieved == True— 退出循环,继续进行流式合成。- 此轮期间发生用户注入 — 始终重新规划,无论置信度或预算如何。用户后续消息被视为需求变更,需要重新尝试。这不会消耗自主重新规划预算。
- 自主重新规划预算已耗尽 — 退出循环。预算为
max_replan_rounds - 1次自主重新规划(默认:从总共 3 轮中进行 2 次自主重新规划)。 confidence >= replan_stop_confidence— 退出循环。即使目标未完全实现,高置信度分数(默认阈值:0.8,可通过DAG_REPLAN_STOP_CONFIDENCE配置)表明分析器对发生的情况相当确定 — 重新规划不太可能有帮助。- 其他情况 — 重新规划。目标未实现,置信度低,预算仍有余额。
_format_replan_context() 来构建前一轮的摘要。这包括分析器的推理和每个步骤结果的截断预览(每个步骤最多 500 个字符)。激进的截断是有意的:规划器需要知道发生了什么和哪里出错了,而不是每个步骤输出的完整细节。此上下文与原始富化查询一起作为 context 参数传递给 DAGPlanner.plan()。
最大轮数。 DAG_MAX_REPLAN_ROUNDS 环境变量(默认 3)控制规划轮数的总数。使用默认设置,第一轮是初始规划,最多留下 2 次自主重新规划。用户触发的重新规划(通过消息注入)不计入此预算 — 用户可以无限期地引导管道。
SSE 事件。 当管道决定重新规划时,它会发出包含分析器推理的 replanning 阶段事件。前端使用此事件向用户显示管道重试的原因。
enriched_query 累积。 用户后续消息在各轮中附加到富化查询:enriched_query += "\n\n[User follow-up]: {content}"。这意味着规划器在构建修订计划时看到用户意图的完整演变 — 原始请求加上所有后续澄清。
流式合成
当分析器确认目标已实现(analysis.achieved == True)时,管道通过 PlanAnalyzer.stream_synthesize() 向用户流式传输合成的最终答案。
输入。 合成调用接收三个输入:原始目标、格式化的步骤结果(每个步骤最多 10K 字符)和分析器来自非流式分析调用的推理。推理提供了合成应涵盖内容的”路线图”。
系统提示。 合成提示指示 LLM 直接回答,不包含元注释(“不要包含诸如’基于结果’之类的短语”),匹配原始目标的语言,并在适用时比较来自不同来源的结果。如果可用,将从用户偏好设置追加语言指令。
流式传输。 该方法使用 stream_chat() 逐步生成令牌。SSE 层将每个块包装在带有 status: "delta" 的 answer 事件中,为前端提供最终答案的实时渲染。
回退链。 两个回退路径处理失败:
-
stream_synthesize 引发异常 — 回退到非流式
analyze()调用中的analysis.final_answer。此答案已在分析期间生成,因此即使流式调用失败也可用。 -
目标未实现(未尝试合成) — 连接所有已完成的步骤结果,用水平线分隔。每个结果以其步骤 ID 为前缀。如果根本没有步骤完成,返回
"(goal not achieved)"。
双LLM架构
DAG引擎的成本和延迟特性由其双模型设计决定。分工如下:| 角色 | 用途 | 优化方向 |
|---|---|---|
| Smart LLM | 规划、分析、答案合成 | 推理能力 |
| Fast LLM | 步骤执行、上下文压缩、历史总结 | 成本和延迟 |
PlanStep上的model_hint字段允许规划器将单个步骤提升到Smart LLM。当model_hint为null时,执行器使用默认智能体(Fast LLM)。当其为"fast"时,执行器通过模型注册表显式使用Fast LLM。规划器被指示为确定性任务设置"fast",为复杂推理设置null,但也可以设置为ModelRegistry中注册的任何自定义角色。模型解析在每个步骤一次通过_resolve_agent()进行,时间在该步骤的ReAct循环开始之前——步骤内的所有迭代(工具选择、ReAct循环、ContextGuard压缩)使用相同的已解析LLM。模型在步骤中途不会改变。
预算独立性。 Smart和Fast LLM具有独立的上下文预算,从各自的模型配置计算得出。DAG步骤执行使用Fast LLM的预算;规划和分析调用使用Smart LLM的预算。这很重要,因为操作员通常将大上下文模型(128K+)用于规划,将较小、更快的模型(32K)用于步骤执行。有关预算计算方式的详细信息,请参阅上下文管理——预算配置。