メインコンテンツへスキップ

概要

FIM One は、スワップ可能なコンポーネントごとに 1 つの薄い抽象基底クラスのセットを中心に構築されています。すべてのコンポーネントは単一の責任と最小限のインターフェースを持ちます。抽象メソッドを実装し、インスタンスを適切なレジストリまたはインジェクターに配線すると、システムの残りの部分が自動的に実装を使用します。
拡張ポイント基底クラスファイル登録
LLM プロバイダーBaseLLMcore/model/base.pyModelRegistry.register()
ツールBaseToolcore/tool/base.pybuiltin/ にファイルをドロップ
メモリBaseMemorycore/memory/base.pyコンストラクタインジェクション
埋め込みBaseEmbeddingcore/embedding/base.pyコンストラクタインジェクション
画像生成BaseImageGencore/image_gen/base.pyコンストラクタインジェクション
リランカーBaseRerankercore/reranker/base.pyコンストラクタインジェクション
Web フェッチバックエンドBaseWebFetchcore/web/fetch/base.pyコンストラクタインジェクション
Web 検索バックエンドBaseWebSearchcore/web/search/base.pyコンストラクタインジェクション
RAG レトリーバーBaseRetrieverrag/base.pyコンストラクタインジェクション
ドキュメントローダーBaseLoaderrag/loaders/base.pyローダーレジストリ / インジェクション
テキストチャンカーBaseChunkerrag/chunking/base.pyコンストラクタインジェクション

カスタムLLMプロバイダー

BaseLLMには2つの必須メソッド — chatstream_chat — と、モデルが何ができるかをシステムの残りの部分に伝えるオプションのabilitiesプロパティがあります。
from collections.abc import AsyncIterator
from typing import Any

from fim_one.core.model.base import BaseLLM
from fim_one.core.model.types import ChatMessage, LLMResult, StreamChunk


class MyLLM(BaseLLM):
    def __init__(self, api_key: str, model: str) -> None:
        self._api_key = api_key
        self._model = model

    @property
    def model_id(self) -> str:
        return self._model

    @property
    def abilities(self) -> dict[str, bool]:
        return {
            "tool_call": True,   # supports native function calling
            "json_mode": True,   # supports response_format JSON mode
            "vision":   False,
            "streaming": True,
        }

    async def chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
        response_format: dict[str, Any] | None = None,
    ) -> LLMResult:
        # Call your provider, return LLMResult(message=..., usage=...)
        ...

    async def stream_chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
    ) -> AsyncIterator[StreamChunk]:
        # Yield StreamChunk instances as tokens arrive
        ...
        yield  # make type-checker happy

ModelRegistry経由での登録

ModelRegistryは名前をBaseLLMインスタンスにマップし、ロールで解決します。システムは4つの組み込みロールを使用します:generalfastcompact、およびvision。独自のロールを追加できます。
from fim_one.core.model.registry import ModelRegistry

registry = ModelRegistry()
registry.register("my-llm", MyLLM(api_key="...", model="my-v1"), roles=["general"])
registry.register("my-fast", MyLLM(api_key="...", model="my-mini"), roles=["fast", "compact"])

後で取得

llm = registry.get_default()           # first "general" model, or first registered
llm = registry.get_by_role("fast")     # first model with the "fast" role
llm = registry.get("my-llm")           # by exact name
abilities 辞書は、LLM と ReAct エンジン間の契約です。tool_call=True で、エージェントが use_native_tools=True で作成された場合、エンジンはネイティブ関数呼び出しを使用します。それ以外の場合は、自動的に JSON モードにフォールバックします。

カスタムツール

ツールは最も一般的な拡張機能です。BaseToolには3つの必須要素があります:namedescriptionrunです。その他すべてには適切なデフォルト値があります。
from typing import Any
from fim_one.core.tool.base import BaseTool


class GitStatusTool(BaseTool):
    @property
    def name(self) -> str:
        return "git_status"

    @property
    def description(self) -> str:
        return "Return the current git status of a repository."

    @property
    def category(self) -> str:
        return "filesystem"   # groups the tool in the UI

    @property
    def parameters_schema(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Absolute path to the repository root.",
                }
            },
            "required": ["path"],
        }

    async def run(self, *, path: str, **kwargs: Any) -> str:
        import asyncio
        result = await asyncio.create_subprocess_shell(
            f"git -C {path} status --short",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await result.communicate()
        return stdout.decode()

自動検出

src/fim_one/core/tool/builtin/ にファイルをドロップします。discover_builtin_tools() スキャナーは、具体的な(抽象的でない)BaseTool サブクラスを自動的に検出します。手動登録は不要です。
src/fim_one/core/tool/builtin/
├── calculator.py       ← 既存のツール
├── git_status.py       ← 新しいファイル → 自動検出
└── ...
スキャナーは _SKIP_AUTO_DISCOVER にリストされているクラスをスキップします。外部設定(例:APIキー)が必要で、起動時に条件付きでインスタンス化する必要があるツールに対してそのセットを使用します。

利用不可を通知する

依存関係が不足している場合、ツールカタログにメッセージを表示するには availability() をオーバーライドします:
def availability(self) -> tuple[bool, str | None]:
    import os
    if not os.getenv("GITHUB_TOKEN"):
        return False, "GITHUB_TOKEN environment variable is not set."
    return True, None

アーティファクトを含むリッチな結果

ツールがファイルを生成する場合、プレーンな str の代わりに ToolResult を返します:
from fim_one.core.tool.base import Artifact, ToolResult

async def run(self, **kwargs: Any) -> ToolResult:
    # ... produce a file at /tmp/report.html ...
    return ToolResult(
        content="Report generated.",
        content_type="text",
        artifacts=[Artifact(name="report.html", path="/uploads/report.html", mime_type="text/html", size=4096)],
    )

カスタムメモリ

BaseMemory は会話履歴の永続化レイヤーです。3つのメソッドがあります: add_messageget_messagesclear
import redis.asyncio as redis
from fim_one.core.memory.base import BaseMemory
from fim_one.core.model.types import ChatMessage


class RedisMemory(BaseMemory):
    def __init__(self, conversation_id: str, redis_url: str) -> None:
        self._key = f"conv:{conversation_id}"
        self._redis = redis.from_url(redis_url)

    async def add_message(self, message: ChatMessage) -> None:
        import json
        await self._redis.rpush(self._key, json.dumps(message))

    async def get_messages(self) -> list[ChatMessage]:
        import json
        raw = await self._redis.lrange(self._key, 0, -1)
        return [json.loads(m) for m in raw]

    async def clear(self) -> None:
        await self._redis.delete(self._key)
エージェントコンストラクタを通じて注入します: ReActAgent(llm=llm, memory=RedisMemory(conv_id, url))

カスタム埋め込み

BaseEmbedding は 2 つのメソッド embed_texts(バッチ)と embed_query(単一)、および dimension プロパティを提供します。
from fim_one.core.embedding.base import BaseEmbedding


class MyEmbedding(BaseEmbedding):
    def __init__(self, model: str) -> None:
        self._model = model
        self._dim = 1536

    @property
    def dimension(self) -> int:
        return self._dim

    async def embed_texts(self, texts: list[str]) -> list[list[float]]:
        # Batch embed documents
        ...

    async def embed_query(self, query: str) -> list[float]:
        # Embed a single query — often uses a different instruction prefix
        ...
embed_textsembed_query の区別が存在するのは、多くの埋め込みモデル(例:E5、BGE)がドキュメントとクエリに異なるプレフィックスを使用して検索品質を向上させるためです。

カスタム画像生成

BaseImageGen には単一のメソッド generate があります。このメソッドは画像を output_dir に保存し、ファイルパスとサーバー相対 URL を含む ImageResult を返します。
from fim_one.core.image_gen.base import BaseImageGen, ImageResult


class StableDiffusionImageGen(BaseImageGen):
    async def generate(
        self,
        prompt: str,
        *,
        aspect_ratio: str = "1:1",
        output_dir: str,
    ) -> ImageResult:
        # Call your SD API, save to output_dir
        file_path = f"{output_dir}/image.png"
        return ImageResult(
            file_path=file_path,
            url=f"/uploads/{file_path.split('/')[-1]}",
            prompt=prompt,
            model="stable-diffusion-xl",
        )

カスタムリランカー

BaseRerankerはクエリとドキュメント文字列のリストを受け取り、スコア付きで並べ替えて返します。
from fim_one.core.reranker.base import BaseReranker, RerankResult


class CrossEncoderReranker(BaseReranker):
    async def rerank(
        self, query: str, documents: list[str], *, top_k: int = 5
    ) -> list[RerankResult]:
        # Score each (query, doc) pair with a cross-encoder
        scores = await self._score_pairs(query, documents)
        results = [
            RerankResult(index=i, score=score, text=doc)
            for i, (doc, score) in enumerate(zip(documents, scores))
        ]
        results.sort(key=lambda r: r.score, reverse=True)
        return results[:top_k]

カスタムウェブバックエンド

Web fetch

BaseWebFetch はURLをフェッチし、そのコンテンツをMarkdownまたはプレーンテキストとして返します。
from fim_one.core.web.fetch.base import BaseWebFetch


class PlaywrightFetch(BaseWebFetch):
    async def fetch(self, url: str) -> str:
        # Use Playwright to render JS-heavy pages
        async with async_playwright() as p:
            browser = await p.chromium.launch()
            page = await browser.new_page()
            await page.goto(url)
            content = await page.content()
            await browser.close()
        return html_to_markdown(content)

ウェブ検索

BaseWebSearch は、ランク付けされた SearchResult オブジェクトのリストを返します。
from fim_one.core.web.search.base import BaseWebSearch, SearchResult


class BingSearch(BaseWebSearch):
    async def search(self, query: str, *, num_results: int = 10) -> list[SearchResult]:
        # Call Bing Search API
        ...
        return [
            SearchResult(title=r["name"], url=r["url"], snippet=r["snippet"])
            for r in raw_results[:num_results]
        ]

カスタムRAGコンポーネント

RAGパイプラインには、独立して交換可能な3つのステージがあります:ロード、チャンキング、検索。

ドキュメントローダー

BaseLoader はファイルパスを LoadedDocument オブジェクトのリストに変換します。PDF ローダーは通常、ページごとに 1 つのドキュメントを返します。
from pathlib import Path
from fim_one.rag.loaders.base import BaseLoader, LoadedDocument


class DocxLoader(BaseLoader):
    async def load(self, path: Path) -> list[LoadedDocument]:
        from docx import Document
        doc = Document(path)
        text = "\n".join(p.text for p in doc.paragraphs)
        return [LoadedDocument(content=text, metadata={"source": str(path)})]

テキストチャンカー

BaseChunkerはテキストをChunkオブジェクトに分割します。MAX_CHUNK_SIZE = 6000文字がハード上限です — このサイズを超えるチャンクはJina Embeddings v3トークンウィンドウをオーバーフローする可能性があります。
from typing import Any
from fim_one.rag.chunking.base import BaseChunker, Chunk


class SentenceChunker(BaseChunker):
    def __init__(self, sentences_per_chunk: int = 5) -> None:
        self._n = sentences_per_chunk

    async def chunk(self, text: str, metadata: dict[str, Any] | None = None) -> list[Chunk]:
        import nltk
        sentences = nltk.sent_tokenize(text)
        chunks = []
        for i in range(0, len(sentences), self._n):
            chunk_text = " ".join(sentences[i : i + self._n])
            chunks.append(Chunk(text=chunk_text, metadata=metadata or {}, index=i // self._n))
        return chunks

Retriever

BaseRetriever は任意のバックエンドをクエリし、ランク付けされた Document オブジェクトを返します。
from fim_one.rag.base import BaseRetriever, Document


class ElasticsearchRetriever(BaseRetriever):
    def __init__(self, es_client, index: str) -> None:
        self._es = es_client
        self._index = index

    async def retrieve(self, query: str, *, top_k: int = 5) -> list[Document]:
        resp = await self._es.search(
            index=self._index,
            query={"match": {"content": query}},
            size=top_k,
        )
        return [
            Document(
                content=hit["_source"]["content"],
                metadata=hit["_source"].get("metadata", {}),
                score=hit["_score"],
            )
            for hit in resp["hits"]["hits"]
        ]

設計原則

すべての基本クラスにおいて、カスタム実装を正しく記述しやすくするための一貫したパターンがあります: 非同期ファースト。 すべてのメソッドは async def です。実装が同期的であっても、イベントループをブロックするのではなく、asyncio.to_thread() でラップしてください。 ツールからの文字列出力。 BaseTool.run()str(または ToolResult)を返します。LLM が見るのはテキストのみです。ツール実装は複雑なデータを読みやすい形式にシリアライズする責任があります。 最小限のインターフェース。 各基本クラスは必要な最小限の契約を定義します。BaseMemory は 3 つのメソッド、BaseWebFetch は 1 つです。必要のない機能を実装する必要はありません。 継承よりも合成。 基本クラスはインターフェースであり、フレームワークではありません。実装を構築時に注入します。ランタイムは決してモンキーパッチやさらなるサブクラス化を行いません。