Die Pipeline
DAG-Modus zerlegt ein komplexes Ziel in einen gerichteten azyklischen Graphen von Schritten, führt sie mit maximaler Parallelität aus und überprüft dann, ob das Ziel tatsächlich erreicht wurde. Falls nicht, plant er neu und versucht es erneut — autonom, bis zu einem konfigurierbaren Budget. Die Pipeline hat vier Phasen, die eine Schleife bilden: Planung. Das intelligente LLM zerlegt die angereicherte Abfrage in 2–6 Schritte mit expliziten Abhängigkeitskanten. Jeder Schritt erhält eine Aufgabenbeschreibung, einen optionalen Tool-Hinweis und einen Modell-Hinweis, der steuert, ob er auf dem schnellen oder intelligenten LLM ausgeführt wird. Ausführung. Der DAGExecutor startet unabhängige Schritte parallel (bis zu 5 gleichzeitig) und respektiert dabei den Abhängigkeitsgraphen. Jeder Schritt läuft als eigenständiger ReAct-Agent ohne Speicher — er erhält nur seine Aufgabenbeschreibung und die Ergebnisse seiner abgeschlossenen Abhängigkeiten. Analyse. Der PlanAnalyzer bewertet, ob der ausgeführte Plan das ursprüngliche Ziel erreicht hat, und erzeugt ein strukturiertes Urteil:achieved (boolescher Wert), confidence (0,0–1,0), reasoning und eine optionale final_answer.
Neuplanung. Wenn das Ziel nicht erreicht wurde und die Konfidenz unter der Stoppgrenze liegt, kehrt die Pipeline zur Planung mit einem Replan-Kontext zurück, der zusammenfasst, was passiert ist und was schiefgelaufen ist. Diese Schleife läuft bis zu DAG_MAX_REPLAN_ROUNDS Mal autonom.
Zwei LLMs arbeiten durchgehend zusammen: Ein intelligentes LLM kümmert sich um Planung, Analyse und Antwantsynthese (Aufgaben, die hohe Reasoning-Fähigkeit erfordern), während ein schnelles LLM die Schrittausführung und Kontextkomprimierung handhabt (Aufgaben, bei denen Kosten und Latenz wichtiger sind als maximale Reasoning-Leistung). Jeder strukturierte Ausgabeaufruf verwendet structured_llm_call, das eine 3-stufige Degradationskette (Native FC, JSON Mode, Klartext mit Regex-Fallback) bietet, um modellspezifische Ausgabeeigentümlichkeiten zu handhaben.
LLM-Aufrufe-Übersicht
Die vollständige DAG-Pipeline führt sieben unterschiedliche Kategorien von LLM-Aufrufen durch. Das Verständnis dafür, wo jeder Aufruf stattfindet, welches Modell ihn verarbeitet und was bei Fehlern geschieht, ist für Debugging und Kostenoptimierung unerlässlich.| # | Aufrufstelle | Modul | LLM-Rolle | Format | Fallback |
|---|---|---|---|---|---|
| 1 | Verlaufszusammenfassung | chat.py | schnelles LLM | Klartext | letzte 20K Zeichen kürzen |
| 2 | DAGPlanner | planner.py | intelligentes LLM | structured_llm_call | 3-stufiger Abbau |
| 3 | Werkzeugauswahl | react.py | Schritt-LLM | structured_llm_call | alle Werkzeuge zurückgeben |
| 4 | ReAct-Schleife (pro Schritt) | react.py | schnelles/intelligentes LLM | chat() | Wiederholung/Fallback |
| 5 | ContextGuard kompakt | context_guard.py | schnelles LLM | Klartext | smart_truncate |
| 6 | PlanAnalyzer | analyzer.py | intelligentes LLM | structured_llm_call | regex + Standard |
| 7 | stream_synthesize | analyzer.py | intelligentes LLM | stream_chat() | analysis.final_answer |
model_hint: null über die Modellregistrierung zum intelligenten LLM hochgestuft werden kann.
DAGPlanner
Die Aufgabe des Planers besteht darin, ein übergeordnetes Ziel in einen gültigen DAG aus konkreten, umsetzbaren Schritten umzuwandeln. Dies geschieht mit einem einzigenstructured_llm_call zum intelligenten LLM.
Prompt-Design. Der Planungs-Prompt injiziert das aktuelle Datum und Jahr (damit das LLM zeitbewusste Suchen planen kann), erzwingt Sprachübereinstimmung (Aufgabenbeschreibungen müssen die gleiche Sprache wie das Ziel verwenden) und begrenzt die Schrittanzahl auf 2-6. Jeder Schritt hat fünf Felder: id, task, dependencies, tool_hint und model_hint. Der Prompt rät ausdrücklich davon ab, trivial verwandte Teilaufgaben aufzuteilen — „wenn mehrere Überprüfungen in einem einzigen Skript durchgeführt werden können, kombinieren Sie sie in EINEM Schritt.”
Strukturierte Extraktion. Der Planer verwendet structured_llm_call mit einem _PLAN_SCHEMA, das das steps-Array-Schema definiert, und eine parse_fn, die rohe Dicts in PlanStep-Objekte konvertiert. Falls das LLM ein einzelnes Step-Objekt statt eines {"steps": [...]} Wrappers zurückgibt, erfolgt eine automatische Wiederherstellung durch den Parser. Die 3-stufige Degradationskette, die in ReAct Engine — structured_llm_call dokumentiert ist, behandelt Modellausgabe-Besonderheiten über verschiedene Anbieter hinweg.
DAG-Validierung. Nach der Extraktion validiert der Planer die Graphstruktur mit Kahns Algorithmus zur topologischen Sortierung. Zwei Invarianten werden überprüft:
- Keine hängenden Referenzen. Falls ein Schritt auf eine Abhängigkeits-ID verweist, die nicht im Plan vorhanden ist, wird die Referenz stillschweigend gelöscht und eine Warnung protokolliert. Dies ist ein Wiederherstellungsmechanismus — LLMs lassen manchmal Schritte weg, auf die sie verwiesen haben, und ein hartes Fehlschlag würde den gesamten Planungsaufruf verschwenden.
-
Keine Zyklen. Falls Kahns Algorithmus nicht alle Knoten besuchen kann (was bedeutet, dass mindestens ein Zyklus vorhanden ist), wirft der Planer einen
ValueError. Zyklen sind nicht wiederherstellbar — ein zyklischer Plan kann nicht ausgeführt werden.
"fast" Schritten zu, die er als einfach und deterministisch betrachtet (Datenbeschaffung, Formatkonvertierung, unkomplizierte Abfrage) und null Schritten, die tiefere Überlegungen erfordern. Der Executor verwendet diesen Hinweis, um das geeignete LLM pro Schritt auszuwählen. Im Zweifelsfall weist der Prompt das LLM an, null zu verwenden — es ist immer sicherer, das leistungsfähigere Modell zu nutzen.
Eingabekonstruktion. Die angereicherte Abfrage kombiniert Gesprächsverlauf mit der aktuellen Anfrage. Falls das Gespräch lang ist, wird der Verlauf über DbMemory geladen und als "Previous conversation: ..." formatiert. Wenn die resultierende angereicherte Abfrage 16K Token überschreitet (geschätzt via CompactUtils.estimate_tokens), wird sie mit dem planner_input-Hinweis-Prompt von ContextGuard LLM-zusammengefasst, bevor sie an den Planer übergeben wird. Das Fallback, wenn kein schnelles LLM verfügbar ist: hart auf die letzten 20K Zeichen kürzen.
DAGExecutor
Der Executor nimmt einen validiertenExecutionPlan und führt seine Schritte gleichzeitig aus, wobei er Abhängigkeitskanten respektiert und Ressourcenlimits durchsetzt.
Concurrency-Modell. Ein asyncio.Semaphore begrenzt die parallele Schrittausführung auf max_concurrency (Standard 5, konfigurierbar über die Umgebungsvariable MAX_CONCURRENCY). Die Dispatch-Schleife identifiziert alle Schritte, deren Abhängigkeiten abgeschlossen sind, startet sie als asyncio.Task-Instanzen und wartet, bis mindestens einer fertig ist, bevor sie erneut prüft. Schritte werden in sortierter ID-Reihenfolge gestartet, um deterministisches Verhalten zu gewährleisten.
Pro-Schritt ReAct-Agent. Jeder Schritt läuft als unabhängiger ReAct-Agent, der von _resolve_agent() erstellt wird. Wenn der Schritt einen model_hint hat, der einer Rolle in der ModelRegistry entspricht, wird ein temporärer Agent mit dem entsprechenden LLM erstellt. Andernfalls wird der Standard-Fast-LLM-Agent verwendet. Diese Pro-Schritt-Agenten haben kein Gedächtnis — sie starten neu mit nur ihrer Aufgabenbeschreibung, dem ursprünglichen Ziel, eventuellen Tool-Hinweisen und den Ergebnissen abgeschlossener Abhängigkeiten. Diese Isolation ist beabsichtigt: DAG-Schritte sollten eigenständige Arbeitseinheiten sein, die keinen Status über den Graph hinweg durchsickern lassen.
Abhängigkeitskontexteinspeisung. _build_step_context() formatiert die Ergebnisse aller abgeschlossenen Abhängigkeitsschritte in einen Textblock: die ID, der Status, die Aufgabenbeschreibung und das Ergebnis jeder Abhängigkeit. Wenn ein ContextGuard konfiguriert ist und der kombinierte Kontext max_message_chars überschreitet, wird er hart gekürzt mit einem [Dependency context truncated]-Suffix. Dies verhindert, dass ein Schritt, der von mehreren ausführlichen Vorgängern abhängt, sein eigenes Kontextfenster sprengt.
Schritt-Timeout. Jeder Schritt wird in asyncio.wait_for mit einem Standard-Timeout von 600 Sekunden (10 Minuten) eingewickelt. Wenn ein Schritt diesen überschreitet, wird er abgebrochen und als "failed" mit einer Timeout-Nachricht markiert. Das Timeout ist pro Schritt, nicht pro Plan — ein 5-Schritt-Plan kann theoretisch 50 Minuten laufen, wenn Schritte sequenziell ausgeführt werden.
Unterbrechung und Abbruch. Der Executor hat zwei unterschiedliche Abbruchpfade, die jeweils durch ein anderes Ereignis ausgelöst werden:
Sanfter Übersprung — Stop-Ereignis. Wenn ein Benutzer während der Ausführung eine Folgefrage sendet, setzt der Orchestrator in chat.py exec_stop_event. Der Executor prüft dieses Flag am Anfang jedes Dispatch-Zyklus: wenn gesetzt, werden alle verbleibenden pending-Schritte sofort als "skipped" mit dem Grund "Skipped — user changed requirements" markiert und die Schleife beendet. Bereits laufende Schritte dürfen sich vollenden — nur nicht gestartete Schritte werden aufgegeben. Dieser schnelle Ausstieg ermöglicht es der Pipeline, um die aktualisierte Absicht des Benutzers neu zu planen, ohne auf die vollständige Ausführung des ursprünglichen Plans zu warten.
Sofortiger Abbruch — asyncio-Abbruch. Wenn der HTTP-Client die Verbindung trennt, bricht chat.py den Top-Level-run_task über asyncio.Task.cancel() ab. Der Executor fängt asyncio.CancelledError ab, bricht alle derzeit laufenden Schritt-Tasks ab, wartet auf deren Bestätigung über asyncio.gather(..., return_exceptions=True) und wirft dann erneut. Die Client-Trennung wird erkannt, indem alle 0,5 Sekunden await request.is_disconnected() in der SSE-Ereignisschleife abgerufen wird.
Der semantische Unterschied ist wichtig: Stop-Ereignis bedeutet „überspringe, was nicht gestartet wurde, aber bewahre, was bereits läuft” — abgeschlossene Schrittergebnisse bleiben verfügbar, um die Neuplanung zu informieren. CancelledError bedeutet „breche alles sofort ab” — alle laufenden Arbeiten werden ohne Ergebniswiederherstellung verworfen.
Deadlock-Erkennung. Wenn die Dispatch-Schleife keine laufenden Tasks und keine gestarteten Schritte findet (weil ihre Abhängigkeiten fehlgeschlagen sind), werden alle verbleibenden ausstehenden Schritte als "failed" mit einer Nachricht markiert, die erklärt, dass ihre Abhängigkeiten nie abgeschlossen wurden. Dies verhindert, dass der Executor auf unbestimmte Zeit hängen bleibt.
Fortschritts-Callbacks. Der Executor feuert (step_id, event, data)-Callbacks für drei Ereignistypen: "started" (Schritt gestartet), "iteration" (Tool-Aufruf innerhalb eines Schritts) und "completed" (Schritt beendet). Die SSE-Schicht in chat.py verbindet diese Callbacks mit step_progress-Ereignissen, die das Frontend zur Darstellung der Echtzeit-DAG-Visualisierung verwendet.
PlanAnalyzer
Der Analyzer bewertet, ob der ausgeführte Plan das ursprüngliche Ziel erreicht hat. Er erzeugt ein strukturiertesAnalysisResult mit vier Feldern:
achieved(boolean) —truenur wenn das Ziel vollständig erreicht wurde.confidence(float, 0.0-1.0) — wie sicher sich der Analyzer in seiner Bewertung ist. Widersprüchliche Quellen senken diesen Wert.final_answer(string oder null) — eine synthetisierte Antwort bei Erfolg,nullandernfalls.reasoning(string) — die Chain-of-Thought-Begründung des LLM.
structured_llm_call mit _ANALYSIS_SCHEMA, einer parse_fn, die Typkonvertierung und Confidence-Begrenzung handhabt, und einem regex_fallback für fehlerhaft formatiertes JSON. Der Regex-Fallback (_regex_extract_analysis) extrahiert achieved, confidence, final_answer und reasoning Felder aus teilweise gültigem JSON mittels Pattern Matching. Dies ist wichtig, da Analyseantworten tendenziell länger und komplexer sind als Planungsantworten, was JSON-Formatierungsfehler wahrscheinlicher macht.
Sichere Standardeinstellung. Falls alle Extraktionsebenen fehlschlagen (native FC, JSON-Modus, Klartext, Regex), gibt der Analyzer AnalysisResult(achieved=False, confidence=0.0, reasoning="Could not parse analysis response") zurück. Dies stellt sicher, dass die Pipeline immer ein verwertbares Ergebnis erhält — ein Parse-Fehler wird zu einem „nicht erreicht”-Urteil, das eine Neuplanung auslöst, anstatt abzustürzen.
Schritt-Ergebnis-Formatierung. Das Ergebnis jedes Schritts wird in der Analyseeingabe auf 10K Zeichen gekürzt. Dies verhindert, dass eine einzelne Schritt-Ausgabe (z. B. ein großer Web-Scrape oder Datendump) das Kontextfenster des Analyzers dominiert und andere Schritt-Ergebnisse verdrängt.
Vergleich mehrerer Quellen. Die Analyseeingabe enthält eine Direktive, um Ergebnisse aus verschiedenen Quellen explizit zu vergleichen. Wenn Web-Suchergebnisse, Wissensbank-Abruf und Dateivorgänge alle Daten beitragen, muss der Analyzer Widersprüche kennzeichnen (unterschiedliche Zahlen, Daten, Aussagen) und angeben, welche Quelle wahrscheinlich zuverlässiger ist. Widersprüche senken den Confidence-Wert, was wiederum die Neuplanungsentscheidung beeinflusst.
Neuplanung
Die Neuplanungsschleife ist das charakteristischste Merkmal der DAG-Engine: Sie kann sich autonom von Teilfehlern erholen, indem sie reflektiert, was schief gelaufen ist, und einen anderen Ansatz versucht. Entscheidungslogik. Nach jeder Runde von Plan-Ausführung-Analyse bewertet der Orchestrator inchat.py das Analyseergebnis:
achieved == True— Schleife beenden, zur Streaming-Synthese übergehen.- Benutzereinspruch während dieser Runde — immer neu planen, unabhängig von Konfidenz oder Budget. Nachfolgende Benutzernachrichten werden als Anforderungsänderungen behandelt, die einen neuen Versuch erfordern. Dies verbraucht nicht das autonome Neuplanungsbudget.
- Autonomes Neuplanungsbudget erschöpft — Schleife beenden. Das Budget beträgt
max_replan_rounds - 1autonome Neuplanungen (Standard: 2 autonome Neuplanungen aus einem Budget von 3 Gesamtrunden). confidence >= replan_stop_confidence— Schleife beenden. Auch wenn das Ziel nicht vollständig erreicht wurde, deutet ein hoher Konfidenzwert (Standard-Schwellenwert: 0,8, konfigurierbar überDAG_REPLAN_STOP_CONFIDENCE) darauf hin, dass der Analyzer sich ziemlich sicher ist, was passiert ist — Neuplanung wird wahrscheinlich nicht helfen.- Andernfalls — neu planen. Das Ziel wurde nicht erreicht, die Konfidenz ist niedrig und das Budget bleibt erhalten.
_format_replan_context() auf, um eine Zusammenfassung der vorherigen Runde zu erstellen. Dies umfasst die Begründung des Analyzers und eine gekürzte Vorschau des Ergebnisses jedes Schritts (maximal 500 Zeichen pro Schritt). Die aggressive Kürzung ist beabsichtigt: Der Planer muss wissen, was passiert ist und was schief gelaufen ist, nicht die vollständigen Details der Ausgabe jedes Schritts. Dieser Kontext wird an DAGPlanner.plan() als context-Parameter zusammen mit der ursprünglichen angereicherten Abfrage übergeben.
Maximale Runden. Die Umgebungsvariable DAG_MAX_REPLAN_ROUNDS (Standard 3) steuert die Gesamtzahl der Planungsrunden. Mit Standardeinstellungen ist die erste Runde der ursprüngliche Plan, was bis zu 2 autonome Neuplanungen ermöglicht. Benutzergesteuerte Neuplanungen (über Nachrichteneinspruch) werden nicht gegen dieses Budget angerechnet — ein Benutzer kann die Pipeline unbegrenzt steuern.
SSE-Ereignis. Wenn die Pipeline sich entscheidet, neu zu planen, sendet sie ein replanning-Phasenereignis mit der Begründung des Analyzers. Das Frontend nutzt dies, um dem Benutzer anzuzeigen, warum die Pipeline erneut versucht wird.
enriched_query-Akkumulation. Nachfolgende Benutzernachrichten werden über Runden hinweg an die angereicherte Abfrage angehängt: enriched_query += "\n\n[User follow-up]: {content}". Dies bedeutet, dass der Planer die vollständige Entwicklung der Benutzerabsicht sieht — die ursprüngliche Anfrage plus alle nachfolgenden Klarstellungen — wenn er einen überarbeiteten Plan erstellt.
Streaming-Synthese
Wenn der Analyzer bestätigt, dass das Ziel erreicht wurde (analysis.achieved == True), streamt die Pipeline eine synthetisierte endgültige Antwort an den Benutzer über PlanAnalyzer.stream_synthesize().
Eingabe. Der Syntheseaufruf erhält drei Eingaben: das ursprüngliche Ziel, die formatierten Schrittergebnisse (max. 10.000 Zeichen pro Schritt) und die Begründung des Analyzers aus dem nicht-streaming Analyseaufruf. Die Begründung bietet eine “Roadmap” für das, was die Synthese abdecken sollte.
Systemaufforderung. Die Syntheseaufforderung weist das LLM an, direkt zu antworten, ohne Meta-Kommentare (“do NOT include phrases like ‘based on the results’”), die Sprache des ursprünglichen Ziels zu verwenden und Ergebnisse aus verschiedenen Quellen gegebenenfalls zu vergleichen. Eine Sprachanweisung aus Benutzereinstellungen wird angehängt, falls verfügbar.
Streaming. Die Methode verwendet stream_chat(), um Token inkrementell auszugeben. Die SSE-Schicht umhüllt jeden Chunk in einem answer-Ereignis mit status: "delta", was dem Frontend die Echtzeitdarstellung der endgültigen Antwort ermöglicht.
Fallback-Kette. Zwei Fallback-Pfade behandeln Fehler:
-
stream_synthesize wirft eine Ausnahme — Fallback zu
analysis.final_answeraus dem nicht-streaminganalyze()-Aufruf. Diese Antwort wurde bereits während der Analyse generiert, daher ist sie verfügbar, auch wenn der Streaming-Aufruf fehlschlägt. -
Ziel nicht erreicht (keine Synthese versucht) — alle abgeschlossenen Schrittergebnisse verketten, getrennt durch horizontale Linien. Jedem Ergebnis wird seine Schritt-ID vorangestellt. Wenn überhaupt keine Schritte abgeschlossen wurden, geben Sie
"(goal not achieved)"zurück.
Zwei-LLM-Architektur
Das Kosten- und Latenzprofil der DAG-Engine wird durch sein Dual-Model-Design geprägt. Die Arbeitsteilung ist:| Rolle | Verwendet für | Optimiert für |
|---|---|---|
| Smart LLM | Planung, Analyse, Antwortsynthesize | Reasoning-Fähigkeit |
| Fast LLM | Schrittausführung, Kontextkomprimierung, Verlaufszusammenfassung | Kosten und Latenz |
model_hint auf jedem PlanStep ermöglicht es dem Planer, einzelne Schritte zum Smart LLM zu befördern. Wenn model_hint null ist, verwendet der Executor den Standard-Agenten (Fast LLM). Wenn es "fast" ist, verwendet der Executor explizit das Fast LLM über die Model Registry. Der Planer wird angewiesen, "fast" für deterministische Aufgaben und null für komplexes Reasoning zu setzen, kann aber auch auf jede benutzerdefinierte Rolle gesetzt werden, die in der ModelRegistry registriert ist. Die Modellauflösung erfolgt einmal pro Schritt über _resolve_agent() unmittelbar vor Beginn der ReAct-Schleife dieses Schritts — alle Iterationen innerhalb des Schritts (Tool-Auswahl, die ReAct-Schleife, ContextGuard-Komprimierung) verwenden das gleiche aufgelöste LLM. Das Modell wechselt nie mitten im Schritt.
Budget-Unabhängigkeit. Die Smart- und Fast-LLMs haben unabhängige Kontextbudgets, die aus ihren jeweiligen Modellkonfigurationen berechnet werden. Die DAG-Schrittausführung verwendet das Budget des Fast LLM; die Planungs- und Analyseaufrufe verwenden das Budget des Smart LLM. Dies ist wichtig, da Operatoren häufig ein großes Kontextmodell (128K+) für die Planung mit einem kleineren, schnelleren Modell (32K) für die Schrittausführung kombinieren. Weitere Informationen zur Berechnung von Budgets finden Sie unter Context Management — Budget Configuration.