パイプライン
DAG モードは複雑な目標を有向非環グラフのステップに分解し、最大限の並列性で実行してから、目標が実際に達成されたかどうかを反映します。達成されていない場合は、再計画して再試行します — 自律的に、設定可能な予算まで。 パイプラインには、ループを形成する 4 つのフェーズがあります: 計画。 スマート LLM は、エンリッチされたクエリを明示的な依存関係エッジを持つ 2~6 個のステップに分解します。各ステップには、タスク説明、オプションのツールヒント、および高速 LLM またはスマート LLM で実行するかどうかを制御するモデルヒントが付与されます。 実行。 DAGExecutor は依存関係グラフを尊重しながら、独立したステップを並列に起動します (最大 5 個の同時実行)。各ステップはメモリのない自己完結型の ReAct エージェントとして実行されます — タスク説明と完了した依存関係の結果のみを受け取ります。 分析。 PlanAnalyzer は、実行されたプランが元の目標を達成したかどうかを評価し、構造化された判定を生成します:achieved (ブール値)、confidence (0.0~1.0)、reasoning、およびオプションの final_answer。
再計画。 目標が達成されず、信頼度が停止しきい値を下回っている場合、パイプラインは何が起こったか、何が間違っていたかをまとめた再計画コンテキストを使用して計画に戻ります。このループは DAG_MAX_REPLAN_ROUNDS 回まで自律的に実行されます。
全体を通じて 2 つの LLM が協力します: スマート LLM は計画、分析、および回答合成を処理します (高い推論能力が必要なタスク)。一方、高速 LLM はステップ実行とコンテキスト圧縮を処理します (コストとレイテンシがピーク推論能力よりも重要なタスク)。すべての構造化出力呼び出しは structured_llm_call を使用します。これはモデル固有の出力の癖に対処するための 3 レベルの低下チェーン (ネイティブ FC、JSON モード、正規表現フォールバック付きプレーンテキスト) を提供します。
LLM呼び出しマップ
完全なDAGパイプラインは、7つの異なるカテゴリーのLLM呼び出しを行います。各呼び出しがどこで発生するか、どのモデルがそれを処理するか、失敗時に何が起こるかを理解することは、デバッグとコスト最適化に不可欠です。| # | 呼び出しサイト | モジュール | LLMロール | フォーマット | フォールバック |
|---|---|---|---|---|---|
| 1 | 履歴サマリー | chat.py | 高速LLM | プレーンテキスト | 最後の20K文字を切り詰め |
| 2 | DAGPlanner | planner.py | スマートLLM | structured_llm_call | 3レベルの段階的低下 |
| 3 | ツール選択 | react.py | ステップLLM | structured_llm_call | すべてのツールを返す |
| 4 | ReActループ(ステップごと) | react.py | 高速/スマートLLM | chat() | リトライ/フォールバック |
| 5 | ContextGuard compact | context_guard.py | 高速LLM | プレーンテキスト | smart_truncate |
| 6 | PlanAnalyzer | analyzer.py | スマートLLM | structured_llm_call | regex + デフォルト |
| 7 | stream_synthesize | analyzer.py | スマートLLM | stream_chat() | analysis.final_answer |
model_hint: nullを持つステップは、モデルレジストリを介してスマートLLMに昇格させることができます。
DAGPlanner
プランナーの役割は、高レベルの目標を有効なDAGの具体的で実行可能なステップに変換することです。これは、スマートLLMへの単一のstructured_llm_callで実行されます。
プロンプト設計。 計画プロンプトは現在の日時と年を注入し(LLMが時間を考慮した検索を計画できるように)、言語マッチングを強制し(タスク説明は目標と同じ言語を使用する必要があります)、ステップ数を2~6に制限します。各ステップには5つのフィールドがあります:id、task、dependencies、tool_hint、およびmodel_hint。プロンプトは、些細に関連するサブタスクの分割を明示的に推奨しません。「複数のチェックを1つのスクリプトで実行できる場合は、それらを1つのステップに統合してください。」
構造化抽出。 プランナーは_PLAN_SCHEMAを使用してstructured_llm_callを実行します。これはsteps配列スキーマを定義し、parse_fnは生のdictをPlanStepオブジェクトに変換します。LLMが{"steps": [...]}ラッパーの代わりに単一のステップオブジェクトを返す場合、パーサーは自動的に復旧します。ReAct Engine — structured_llm_callに記載されている3レベルの段階的な低下チェーンは、プロバイダー間のモデル出力の特性に対応します。
DAG検証。 抽出後、プランナーはKahnのアルゴリズムを使用してトポロジカルソートで グラフ構造を検証します。2つの不変条件がチェックされます:
- ぶら下がり参照がない。 ステップが計画に存在しない依存関係IDを参照する場合、参照は警告ログとともに静かに削除されます。これは復旧メカニズムです。LLMは参照したステップを省略することがあり、ハード失敗は計画全体の呼び出しを無駄にします。
-
サイクルがない。 Kahnのアルゴリズムがすべてのノードにアクセスできない場合(少なくとも1つのサイクルが存在することを意味します)、プランナーは
ValueErrorを発生させます。サイクルは復旧不可能です。サイクリック計画は実行できません。
"fast"を割り当て、より深い推論が必要なステップにnullを割り当てます。エグゼキューターはこのヒントを使用して、ステップごとに適切なLLMを選択します。不確実な場合、プロンプトはLLMにnullを使用するよう指示します。より高性能なモデルを使用する方が常に安全です。
入力構築。 充実したクエリは、会話履歴と現在のリクエストを組み合わせます。会話が長い場合、履歴はDbMemory経由でロードされ、"Previous conversation: ..."としてフォーマットされます。結果の充実したクエリが16Kトークンを超える場合(CompactUtils.estimate_tokens経由で推定)、ContextGuardのplanner_inputヒントプロンプトを使用してLLMで要約され、プランナーに渡されます。高速LLMが利用できない場合のフォールバック:最後の20K文字にハード切り詰めします。
DAGExecutor
エグゼキューターは検証済みのExecutionPlan を受け取り、そのステップを並行実行し、依存関係エッジを尊重し、リソース制限を強制します。
並行実行モデル。 asyncio.Semaphore は並列ステップ実行を max_concurrency(デフォルト 5、MAX_CONCURRENCY 環境変数で設定可能)に制限します。ディスパッチループは依存関係が完了したすべてのステップを特定し、asyncio.Task インスタンスとして起動し、少なくとも 1 つが完了するまで待機してから再度チェックします。ステップは決定論的な動作を保証するため、ソート済み ID 順で起動されます。
ステップごとの ReAct エージェント。 各ステップは _resolve_agent() で作成された独立した ReAct エージェントとして実行されます。ステップに model_hint があり、それが ModelRegistry のロールと一致する場合、対応する LLM を持つ一時的なエージェントが作成されます。それ以外の場合は、デフォルトの高速 LLM エージェントが使用されます。これらのステップごとのエージェントはメモリを持たない — タスク説明、元のゴール、ツールヒント、完了した依存関係の結果のみで新たに開始されます。この分離は意図的です:DAG ステップはグラフ全体で状態をリークしない自己完結した作業単位であるべきです。
依存関係コンテキスト注入。 _build_step_context() は完了したすべての依存ステップの結果をテキストブロックにフォーマットします:各依存関係の ID、ステータス、タスク説明、および結果。ContextGuard が設定されており、結合されたコンテキストが max_message_chars を超える場合、[Dependency context truncated] サフィックス付きでハード切り詰めされます。これにより、複数の冗長な前のステップに依存するステップが独自のコンテキストウィンドウを超えることを防ぎます。
ステップタイムアウト。 各ステップは asyncio.wait_for でラップされ、デフォルトタイムアウトは 600 秒(10 分)です。ステップがこれを超える場合、キャンセルされ、タイムアウトメッセージで "failed" とマークされます。タイムアウトはプランごとではなくステップごとです — 5 ステップのプランは、ステップが順序実行される場合、理論的には 50 分間実行できます。
中断とキャンセル。 エグゼキューターには 2 つの異なるキャンセルパスがあり、それぞれ異なるイベントによってトリガーされます:
グレースフルスキップ — ストップイベント。 ユーザーが実行中にフォローアップメッセージを送信すると、chat.py のオーケストレーターが exec_stop_event を設定します。エグゼキューターはディスパッチサイクルの最上部でこのフラグをチェックします:設定されている場合、すべての残りの pending ステップは即座に "skipped" とマークされ、理由は "Skipped — user changed requirements" で、ループは終了します。既に実行中のステップは完了することが許可されます — 未開始のステップのみが放棄されます。この高速終了により、パイプラインは元のプラン全体が完了するのを待たずに、ユーザーの更新されたインテント周辺で再計画できます。
即座中止 — asyncio キャンセル。 HTTP クライアントが切断されると、chat.py は asyncio.Task.cancel() を介してトップレベルの run_task をキャンセルします。エグゼキューターは asyncio.CancelledError をキャッチし、現在実行中のすべてのステップタスクをキャンセルし、asyncio.gather(..., return_exceptions=True) を介して確認を待機してから再発生させます。クライアント切断は SSE イベントループ内で 0.5 秒ごとに await request.is_disconnected() をポーリングすることで検出されます。
セマンティックな違いは重要です:ストップイベントは「まだ開始されていないものをスキップするが、既に実行中のものは保持する」を意味します — 完了したステップ結果は再計画に情報を提供するために利用可能なままです。CancelledError は「すべてを即座に中止する」を意味します — すべての進行中の作業は結果回復なしでドロップされます。
デッドロック検出。 ディスパッチループが実行中のタスクがなく、起動準備ができたステップがない場合(依存関係が失敗したため)、すべての残りの pending ステップは "failed" とマークされ、依存関係が完了しなかった理由を説明するメッセージが付きます。これにより、エグゼキューターが無期限にハングすることを防ぎます。
進捗コールバック。 エグゼキューターは 3 つのイベントタイプに対して (step_id, event, data) コールバックを発火します:"started"(ステップ起動)、"iteration"(ステップ内のツール呼び出し)、および "completed"(ステップ完了)。chat.py の SSE レイヤーはこれらのコールバックを step_progress イベントにブリッジし、フロントエンドはリアルタイム DAG ビジュアライゼーションをレンダリングするために使用します。
PlanAnalyzer
実行されたプランが元の目標を達成したかどうかを評価するアナライザーです。4つのフィールドを持つ構造化されたAnalysisResult を生成します:
achieved(boolean) — 目標が完全に達成された場合のみtrue。confidence(float, 0.0-1.0) — アナライザーの評価の確実性。矛盾するソースはこのスコアを低下させます。final_answer(string or null) — 達成時の統合された回答、そうでない場合はnull。reasoning(string) — LLMのチェーン・オブ・ソート正当化。
structured_llm_call を _ANALYSIS_SCHEMA、型強制と信頼度クランピングを処理する parse_fn、および不正な形式のJSONに対する regex_fallback で使用します。正規表現フォールバック(_regex_extract_analysis)は、パターンマッチングを使用して部分的に有効なJSONから achieved、confidence、final_answer、および reasoning フィールドを抽出します。これが重要な理由は、分析応答は計画応答よりも長く複雑になる傾向があり、JSON形式エラーがより発生しやすいためです。
安全なデフォルト。 すべての抽出レベルが失敗した場合(ネイティブFC、JSONモード、プレーンテキスト、正規表現)、アナライザーは AnalysisResult(achieved=False, confidence=0.0, reasoning="Could not parse analysis response") を返します。これにより、パイプラインは常に使用可能な結果を取得します — 解析失敗は「達成されていない」という判定になり、クラッシュする代わりに再計画がトリガーされます。
ステップ結果のフォーマット。 各ステップの結果は分析プロンプトで10K文字に切り詰められます。これにより、単一のステップの詳細な出力(大規模なウェブスクレイプやファイルダンプなど)がアナライザーのコンテキストウィンドウを支配し、他のステップの結果を圧迫するのを防ぎます。
マルチソース比較。 分析プロンプトには、異なるソースからの結果を明示的に比較するディレクティブが含まれています。ウェブ検索結果、ナレッジベース検索、およびファイル操作がすべてデータを提供する場合、アナライザーは矛盾(異なる数字、日付、主張)にフラグを立て、どのソースがより信頼できるかを示す必要があります。矛盾は信頼度スコアを低下させ、これが再計画の決定に影響を与えます。
再計画
再計画ループは DAG エンジンの最も特徴的な機能です。部分的な失敗から自律的に回復し、何が問題だったかを反映して別のアプローチを試すことができます。 決定ロジック。 plan-execute-analyze の各ラウンド後、chat.py のオーケストレーターは分析結果を評価します:
achieved == True— ループを終了し、ストリーミング合成に進みます。- このラウンド中にユーザー注入が発生 — 信頼度または予算に関係なく、常に再計画します。ユーザーのフォローアップメッセージは、新しい試みを要求する要件変更として扱われます。これは自律再計画予算を消費しません。
- 自律再計画予算が枯渇 — ループを終了します。予算は
max_replan_rounds - 1自律再計画です(デフォルト:合計 3 ラウンドの予算から 2 回の自律再計画)。 confidence >= replan_stop_confidence— ループを終了します。目標が完全に達成されなかった場合でも、高い信頼度スコア(デフォルト閾値:0.8、DAG_REPLAN_STOP_CONFIDENCEで設定可能)は、アナライザーが何が起こったかについてかなり確信していることを示します — 再計画は役に立つ可能性は低いです。- その他の場合 — 再計画します。目標が達成されず、信頼度が低く、予算が残っています。
_format_replan_context() を呼び出して前のラウンドの要約を構築します。これには、アナライザーの推論と各ステップの結果の切り詰められたプレビュー(ステップあたり最大 500 文字)が含まれます。積極的な切り詰めは意図的です。プランナーは何が起こったかと何が問題だったかを知る必要があり、すべてのステップの出力の完全な詳細ではありません。このコンテキストは、元の充実したクエリとともに context パラメーターとして DAGPlanner.plan() に渡されます。
最大ラウンド数。 DAG_MAX_REPLAN_ROUNDS 環境変数(デフォルト 3)は、計画ラウンドの総数を制御します。デフォルト設定では、最初のラウンドは初期計画であり、最大 2 回の自律再計画が残ります。ユーザーがトリガーした再計画(メッセージ注入経由)はこの予算にカウントされません — ユーザーはパイプラインを無限に操舵できます。
SSE イベント。 パイプラインが再計画することを決定すると、アナライザーの推論を含む replanning フェーズイベントを発行します。フロントエンドはこれを使用して、パイプラインが再試行している理由をユーザーに表示します。
enriched_query の蓄積。 ユーザーのフォローアップメッセージはラウンド全体で充実したクエリに追加されます:enriched_query += "\n\n[User follow-up]: {content}"。これは、プランナーが修正された計画を構築するときに、ユーザーの意図の完全な進化 — 元のリクエストとそれに続くすべての明確化 — を見ることを意味します。
ストリーミング合成
アナライザーがゴール達成を確認すると(analysis.achieved == True)、パイプラインは PlanAnalyzer.stream_synthesize() を通じて合成された最終回答をユーザーにストリーミングします。
入力。 合成呼び出しは3つの入力を受け取ります:元のゴール、フォーマットされたステップ結果(ステップあたり最大10K文字)、およびノンストリーミング分析呼び出しからのアナライザーの推論です。推論は合成がカバーすべき内容の「ロードマップ」を提供します。
システムプロンプト。 合成プロンプトは、LLMにメタコメンタリーなしで直接回答するよう指示し(「‘結果に基づいて’のようなフレーズを含めないこと」)、元のゴールの言語に合わせ、該当する場合は異なるソースからの結果を比較するよう指示します。ユーザー設定から言語ディレクティブが利用可能な場合は追加されます。
ストリーミング。 メソッドは stream_chat() を使用してトークンを段階的に生成します。SSEレイヤーは各チャンクを status: "delta" の answer イベントでラップし、フロントエンドに最終回答のリアルタイムレンダリングを提供します。
フォールバックチェーン。 2つのフォールバックパスが失敗に対応します:
-
stream_synthesize が例外を発生させる — ノンストリーミング
analyze()呼び出しからanalysis.final_answerにフォールバックします。この回答は分析中に既に生成されているため、ストリーミング呼び出しが失敗しても利用可能です。 -
ゴール未達成(合成は試行されない) — すべての完了したステップ結果を連結し、水平線で区切ります。各結果にはそのステップIDが接頭辞として付きます。ステップが全く完了しなかった場合は、
"(goal not achieved)"を返します。
2つのLLMアーキテクチャ
DAGエンジンのコストとレイテンシプロファイルは、デュアルモデル設計によって形成されます。役割分担は以下の通りです:| 役割 | 用途 | 最適化対象 |
|---|---|---|
| Smart LLM | 計画、分析、回答の統合 | 推論能力 |
| Fast LLM | ステップ実行、コンテキスト圧縮、履歴要約 | コストとレイテンシ |
PlanStepのmodel_hintフィールドにより、プランナーは個別のステップをSmart LLMに昇格させることができます。model_hintがnullの場合、実行エンジンはデフォルトエージェント(Fast LLM)を使用します。"fast"の場合、実行エンジンはモデルレジストリを介してFast LLMを明示的に使用します。プランナーは決定論的なタスクに対して"fast"を設定し、複雑な推論に対してnullを設定するよう指示されていますが、ModelRegistryに登録されたカスタムロールに設定することもできます。モデル解決はステップごとに1回、そのステップのReActループが開始される直前に_resolve_agent()を介して行われます。ステップ内のすべての反復(ツール選択、ReActループ、ContextGuard圧縮)は、同じ解決されたLLMを使用します。モデルはステップの途中で変わることはありません。
予算の独立性。 Smart LLMとFast LLMは、それぞれのモデル設定から計算された独立したコンテキスト予算を持ちます。DAGステップ実行はFast LLMの予算を使用し、計画と分析の呼び出しはSmart LLMの予算を使用します。これは重要です。なぜなら、オペレーターは計画用に大規模コンテキストモデル(128K以上)とステップ実行用に小規模で高速なモデル(32K)をペアリングすることが多いためです。予算の計算方法の詳細については、コンテキスト管理 — 予算設定を参照してください。