跳转到主要内容

概述

FIM One 围绕一组精简的抽象基类构建——每个可交换组件对应一个基类。每个组件都有单一职责和最小化接口。你实现抽象方法,将实例连接到相应的注册表或注入器,系统的其余部分会自动使用你的实现。
扩展点基类文件注册
LLM 提供商BaseLLMcore/model/base.pyModelRegistry.register()
工具BaseToolcore/tool/base.pybuiltin/ 中放置文件
内存BaseMemorycore/memory/base.py构造函数注入
嵌入BaseEmbeddingcore/embedding/base.py构造函数注入
图像生成BaseImageGencore/image_gen/base.py构造函数注入
重排序器BaseRerankercore/reranker/base.py构造函数注入
Web 获取后端BaseWebFetchcore/web/fetch/base.py构造函数注入
Web 搜索后端BaseWebSearchcore/web/search/base.py构造函数注入
RAG 检索器BaseRetrieverrag/base.py构造函数注入
文档加载器BaseLoaderrag/loaders/base.py加载器注册表 / 注入
文本分块器BaseChunkerrag/chunking/base.py构造函数注入

自定义 LLM 提供商

BaseLLM 有两个必需方法 — chatstream_chat — 加上一个可选的 abilities 属性,用于告诉系统该模型能做什么。
from collections.abc import AsyncIterator
from typing import Any

from fim_one.core.model.base import BaseLLM
from fim_one.core.model.types import ChatMessage, LLMResult, StreamChunk


class MyLLM(BaseLLM):
    def __init__(self, api_key: str, model: str) -> None:
        self._api_key = api_key
        self._model = model

    @property
    def model_id(self) -> str:
        return self._model

    @property
    def abilities(self) -> dict[str, bool]:
        return {
            "tool_call": True,   # supports native function calling
            "json_mode": True,   # supports response_format JSON mode
            "vision":   False,
            "streaming": True,
        }

    async def chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
        response_format: dict[str, Any] | None = None,
    ) -> LLMResult:
        # Call your provider, return LLMResult(message=..., usage=...)
        ...

    async def stream_chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
    ) -> AsyncIterator[StreamChunk]:
        # Yield StreamChunk instances as tokens arrive
        ...
        yield  # make type-checker happy

通过 ModelRegistry 注册

ModelRegistry 将名称映射到 BaseLLM 实例,并按角色解析。系统使用四个内置角色:generalfastcompactvision。你可以添加自己的角色。
from fim_one.core.model.registry import ModelRegistry

registry = ModelRegistry()
registry.register("my-llm", MyLLM(api_key="...", model="my-v1"), roles=["general"])
registry.register("my-fast", MyLLM(api_key="...", model="my-mini"), roles=["fast", "compact"])

# 稍后检索
llm = registry.get_default()           # first "general" model, or first registered
llm = registry.get_by_role("fast")     # first model with the "fast" role
llm = registry.get("my-llm")           # by exact name
abilities 字典是 LLM 和 ReAct 引擎之间的契约。当 tool_call=True 且智能体是使用 use_native_tools=True 创建时,引擎将使用原生函数调用。否则它会自动回退到 JSON 模式。

自定义工具

工具是最常见的扩展。BaseTool 有三个必需的部分:namedescriptionrun。其他所有内容都有合理的默认值。
from typing import Any
from fim_one.core.tool.base import BaseTool


class GitStatusTool(BaseTool):
    @property
    def name(self) -> str:
        return "git_status"

    @property
    def description(self) -> str:
        return "Return the current git status of a repository."

    @property
    def category(self) -> str:
        return "filesystem"   # groups the tool in the UI

    @property
    def parameters_schema(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Absolute path to the repository root.",
                }
            },
            "required": ["path"],
        }

    async def run(self, *, path: str, **kwargs: Any) -> str:
        import asyncio
        result = await asyncio.create_subprocess_shell(
            f"git -C {path} status --short",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await result.communicate()
        return stdout.decode()

自动发现

将你的文件放在 src/fim_one/core/tool/builtin/ 中。discover_builtin_tools() 扫描器会自动找到任何具体的(非抽象的)BaseTool 子类——无需手动注册。
src/fim_one/core/tool/builtin/
├── calculator.py       ← 现有工具
├── git_status.py       ← 你的新文件 → 自动发现
└── ...
扫描器会跳过 _SKIP_AUTO_DISCOVER 中列出的类。对于需要外部配置(例如 API 密钥)且需要在启动时有条件地实例化的工具,请使用该集合。

信号不可用

覆盖 availability() 以在工具目录中显示依赖项缺失时的消息:
def availability(self) -> tuple[bool, str | None]:
    import os
    if not os.getenv("GITHUB_TOKEN"):
        return False, "GITHUB_TOKEN environment variable is not set."
    return True, None

使用工件获得丰富的结果

当你的工具生成文件时,返回 ToolResult 而不是普通的 str
from fim_one.core.tool.base import Artifact, ToolResult

async def run(self, **kwargs: Any) -> ToolResult:
    # ... produce a file at /tmp/report.html ...
    return ToolResult(
        content="Report generated.",
        content_type="text",
        artifacts=[Artifact(name="report.html", path="/uploads/report.html", mime_type="text/html", size=4096)],
    )

自定义内存

BaseMemory 是对话历史的持久化层。包含三个方法:add_messageget_messagesclear
import redis.asyncio as redis
from fim_one.core.memory.base import BaseMemory
from fim_one.core.model.types import ChatMessage


class RedisMemory(BaseMemory):
    def __init__(self, conversation_id: str, redis_url: str) -> None:
        self._key = f"conv:{conversation_id}"
        self._redis = redis.from_url(redis_url)

    async def add_message(self, message: ChatMessage) -> None:
        import json
        await self._redis.rpush(self._key, json.dumps(message))

    async def get_messages(self) -> list[ChatMessage]:
        import json
        raw = await self._redis.lrange(self._key, 0, -1)
        return [json.loads(m) for m in raw]

    async def clear(self) -> None:
        await self._redis.delete(self._key)
通过智能体构造函数注入:ReActAgent(llm=llm, memory=RedisMemory(conv_id, url))

自定义嵌入

BaseEmbedding 提供两个方法:embed_texts(批量)和 embed_query(单个),以及一个 dimension 属性。
from fim_one.core.embedding.base import BaseEmbedding


class MyEmbedding(BaseEmbedding):
    def __init__(self, model: str) -> None:
        self._model = model
        self._dim = 1536

    @property
    def dimension(self) -> int:
        return self._dim

    async def embed_texts(self, texts: list[str]) -> list[list[float]]:
        # Batch embed documents
        ...

    async def embed_query(self, query: str) -> list[float]:
        # Embed a single query — often uses a different instruction prefix
        ...
embed_textsembed_query 之间的区别存在是因为许多嵌入模型(例如 E5、BGE)对文档和查询使用不同的前缀,以提高检索质量。

自定义图像生成

BaseImageGen 有一个单一方法 generate。它将图像保存到 output_dir 并返回一个 ImageResult,其中包含文件路径和服务器相对 URL。
from fim_one.core.image_gen.base import BaseImageGen, ImageResult


class StableDiffusionImageGen(BaseImageGen):
    async def generate(
        self,
        prompt: str,
        *,
        aspect_ratio: str = "1:1",
        output_dir: str,
    ) -> ImageResult:
        # Call your SD API, save to output_dir
        file_path = f"{output_dir}/image.png"
        return ImageResult(
            file_path=file_path,
            url=f"/uploads/{file_path.split('/')[-1]}",
            prompt=prompt,
            model="stable-diffusion-xl",
        )

自定义重排器

BaseReranker 接收一个查询和文档字符串列表,并返回重新排序后的结果及其分数。
from fim_one.core.reranker.base import BaseReranker, RerankResult


class CrossEncoderReranker(BaseReranker):
    async def rerank(
        self, query: str, documents: list[str], *, top_k: int = 5
    ) -> list[RerankResult]:
        # Score each (query, doc) pair with a cross-encoder
        scores = await self._score_pairs(query, documents)
        results = [
            RerankResult(index=i, score=score, text=doc)
            for i, (doc, score) in enumerate(zip(documents, scores))
        ]
        results.sort(key=lambda r: r.score, reverse=True)
        return results[:top_k]

自定义网络后端

Web 获取

BaseWebFetch 获取一个 URL 并将其内容作为 Markdown 或纯文本返回。
from fim_one.core.web.fetch.base import BaseWebFetch


class PlaywrightFetch(BaseWebFetch):
    async def fetch(self, url: str) -> str:
        # Use Playwright to render JS-heavy pages
        async with async_playwright() as p:
            browser = await p.chromium.launch()
            page = await browser.new_page()
            await page.goto(url)
            content = await page.content()
            await browser.close()
        return html_to_markdown(content)

网络搜索

BaseWebSearch 返回一个排序的 SearchResult 对象列表。
from fim_one.core.web.search.base import BaseWebSearch, SearchResult


class BingSearch(BaseWebSearch):
    async def search(self, query: str, *, num_results: int = 10) -> list[SearchResult]:
        # Call Bing Search API
        ...
        return [
            SearchResult(title=r["name"], url=r["url"], snippet=r["snippet"])
            for r in raw_results[:num_results]
        ]

自定义 RAG 组件

RAG 管道有三个独立可交换的阶段:加载、分块和检索。

文档加载器

BaseLoader 将文件路径转换为 LoadedDocument 对象列表。PDF 加载器通常每页返回一个文档。
from pathlib import Path
from fim_one.rag.loaders.base import BaseLoader, LoadedDocument


class DocxLoader(BaseLoader):
    async def load(self, path: Path) -> list[LoadedDocument]:
        from docx import Document
        doc = Document(path)
        text = "\n".join(p.text for p in doc.paragraphs)
        return [LoadedDocument(content=text, metadata={"source": str(path)})]

文本分块器

BaseChunker 将文本分割成 Chunk 对象。MAX_CHUNK_SIZE = 6000 字符是硬性上限 — 超过此大小的块可能会溢出 Jina Embeddings v3 token 窗口。
from typing import Any
from fim_one.rag.chunking.base import BaseChunker, Chunk


class SentenceChunker(BaseChunker):
    def __init__(self, sentences_per_chunk: int = 5) -> None:
        self._n = sentences_per_chunk

    async def chunk(self, text: str, metadata: dict[str, Any] | None = None) -> list[Chunk]:
        import nltk
        sentences = nltk.sent_tokenize(text)
        chunks = []
        for i in range(0, len(sentences), self._n):
            chunk_text = " ".join(sentences[i : i + self._n])
            chunks.append(Chunk(text=chunk_text, metadata=metadata or {}, index=i // self._n))
        return chunks

检索器

BaseRetriever 查询任何后端并返回排序的 Document 对象。
from fim_one.rag.base import BaseRetriever, Document


class ElasticsearchRetriever(BaseRetriever):
    def __init__(self, es_client, index: str) -> None:
        self._es = es_client
        self._index = index

    async def retrieve(self, query: str, *, top_k: int = 5) -> list[Document]:
        resp = await self._es.search(
            index=self._index,
            query={"match": {"content": query}},
            size=top_k,
        )
        return [
            Document(
                content=hit["_source"]["content"],
                metadata=hit["_source"].get("metadata", {}),
                score=hit["_score"],
            )
            for hit in resp["hits"]["hits"]
        ]

设计原则

几个模式在所有基类中保持一致,使自定义实现更容易正确编写: 异步优先。 每个方法都是 async def。即使你的实现是同步的,也要用 asyncio.to_thread() 包装它,而不是阻塞事件循环。 工具的字符串输出。 BaseTool.run() 返回 str(或 ToolResult)。LLM 只能看到文本 — 工具实现负责将复杂数据序列化为可读格式。 最小化接口。 每个基类定义最小的必需契约。BaseMemory 是三个方法;BaseWebFetch 是一个。你永远不需要实现你不需要的功能。 组合优于继承。 基类是接口,不是框架。你在构造时注入你的实现;运行时永远不会对其进行猴子补丁或进一步子类化。