Skip to main content

Overview

FIM One is built around a set of thin abstract base classes — one per swappable component. Every component has a single responsibility and a minimal interface. You implement the abstract methods, wire the instance into the appropriate registry or injector, and the rest of the system uses your implementation automatically.
Extension pointBase classFileRegistration
LLM providerBaseLLMcore/model/base.pyModelRegistry.register()
ToolBaseToolcore/tool/base.pyDrop a file in builtin/
MemoryBaseMemorycore/memory/base.pyConstructor injection
EmbeddingBaseEmbeddingcore/embedding/base.pyConstructor injection
Image generationBaseImageGencore/image_gen/base.pyConstructor injection
RerankerBaseRerankercore/reranker/base.pyConstructor injection
Web fetch backendBaseWebFetchcore/web/fetch/base.pyConstructor injection
Web search backendBaseWebSearchcore/web/search/base.pyConstructor injection
RAG retrieverBaseRetrieverrag/base.pyConstructor injection
Document loaderBaseLoaderrag/loaders/base.pyLoader registry / injection
Text chunkerBaseChunkerrag/chunking/base.pyConstructor injection

Custom LLM provider

BaseLLM has two required methods — chat and stream_chat — plus an optional abilities property that tells the rest of the system what the model can do.
from collections.abc import AsyncIterator
from typing import Any

from fim_one.core.model.base import BaseLLM
from fim_one.core.model.types import ChatMessage, LLMResult, StreamChunk


class MyLLM(BaseLLM):
    def __init__(self, api_key: str, model: str) -> None:
        self._api_key = api_key
        self._model = model

    @property
    def model_id(self) -> str:
        return self._model

    @property
    def abilities(self) -> dict[str, bool]:
        return {
            "tool_call": True,   # supports native function calling
            "json_mode": True,   # supports response_format JSON mode
            "vision":   False,
            "streaming": True,
        }

    async def chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
        response_format: dict[str, Any] | None = None,
    ) -> LLMResult:
        # Call your provider, return LLMResult(message=..., usage=...)
        ...

    async def stream_chat(
        self,
        messages: list[ChatMessage],
        *,
        tools: list[dict[str, Any]] | None = None,
        tool_choice: str | dict[str, Any] | None = None,
        temperature: float | None = None,
        max_tokens: int | None = None,
    ) -> AsyncIterator[StreamChunk]:
        # Yield StreamChunk instances as tokens arrive
        ...
        yield  # make type-checker happy

Registration via ModelRegistry

ModelRegistry maps names to BaseLLM instances and resolves by role. The system uses four built-in roles: general, fast, compact, and vision. You can add your own.
from fim_one.core.model.registry import ModelRegistry

registry = ModelRegistry()
registry.register("my-llm", MyLLM(api_key="...", model="my-v1"), roles=["general"])
registry.register("my-fast", MyLLM(api_key="...", model="my-mini"), roles=["fast", "compact"])

# Retrieve later
llm = registry.get_default()           # first "general" model, or first registered
llm = registry.get_by_role("fast")     # first model with the "fast" role
llm = registry.get("my-llm")           # by exact name
The abilities dict is the contract between the LLM and the ReAct engine. When tool_call=True and the agent was created with use_native_tools=True, the engine will use native function calling. Otherwise it falls back to JSON mode automatically.

Custom tool

Tools are the most common extension. BaseTool has three required pieces: name, description, and run. Everything else has sensible defaults.
from typing import Any
from fim_one.core.tool.base import BaseTool


class GitStatusTool(BaseTool):
    @property
    def name(self) -> str:
        return "git_status"

    @property
    def description(self) -> str:
        return "Return the current git status of a repository."

    @property
    def category(self) -> str:
        return "filesystem"   # groups the tool in the UI

    @property
    def parameters_schema(self) -> dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Absolute path to the repository root.",
                }
            },
            "required": ["path"],
        }

    async def run(self, *, path: str, **kwargs: Any) -> str:
        import asyncio
        result = await asyncio.create_subprocess_shell(
            f"git -C {path} status --short",
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, _ = await result.communicate()
        return stdout.decode()

Auto-discovery

Drop your file in src/fim_one/core/tool/builtin/. The discover_builtin_tools() scanner will find any concrete (non-abstract) BaseTool subclass automatically — no manual registration needed.
src/fim_one/core/tool/builtin/
├── calculator.py       ← existing tool
├── git_status.py       ← your new file → auto-discovered
└── ...
The scanner skips classes listed in _SKIP_AUTO_DISCOVER. Use that set for tools that require external configuration (e.g. an API key) and need to be conditionally instantiated at startup.

Signalling unavailability

Override availability() to surface a message in the tool catalog when a dependency is missing:
def availability(self) -> tuple[bool, str | None]:
    import os
    if not os.getenv("GITHUB_TOKEN"):
        return False, "GITHUB_TOKEN environment variable is not set."
    return True, None

Rich results with artifacts

Return a ToolResult instead of a plain str when your tool produces files:
from fim_one.core.tool.base import Artifact, ToolResult

async def run(self, **kwargs: Any) -> ToolResult:
    # ... produce a file at /tmp/report.html ...
    return ToolResult(
        content="Report generated.",
        content_type="text",
        artifacts=[Artifact(name="report.html", path="/uploads/report.html", mime_type="text/html", size=4096)],
    )

Custom memory

BaseMemory is the persistence layer for conversation history. Three methods: add_message, get_messages, clear.
import redis.asyncio as redis
from fim_one.core.memory.base import BaseMemory
from fim_one.core.model.types import ChatMessage


class RedisMemory(BaseMemory):
    def __init__(self, conversation_id: str, redis_url: str) -> None:
        self._key = f"conv:{conversation_id}"
        self._redis = redis.from_url(redis_url)

    async def add_message(self, message: ChatMessage) -> None:
        import json
        await self._redis.rpush(self._key, json.dumps(message))

    async def get_messages(self) -> list[ChatMessage]:
        import json
        raw = await self._redis.lrange(self._key, 0, -1)
        return [json.loads(m) for m in raw]

    async def clear(self) -> None:
        await self._redis.delete(self._key)
Inject via the agent constructor: ReActAgent(llm=llm, memory=RedisMemory(conv_id, url)).

Custom embedding

BaseEmbedding provides two methods: embed_texts (batch) and embed_query (single), plus a dimension property.
from fim_one.core.embedding.base import BaseEmbedding


class MyEmbedding(BaseEmbedding):
    def __init__(self, model: str) -> None:
        self._model = model
        self._dim = 1536

    @property
    def dimension(self) -> int:
        return self._dim

    async def embed_texts(self, texts: list[str]) -> list[list[float]]:
        # Batch embed documents
        ...

    async def embed_query(self, query: str) -> list[float]:
        # Embed a single query — often uses a different instruction prefix
        ...
The distinction between embed_texts and embed_query exists because many embedding models (e.g. E5, BGE) use different prefixes for documents vs. queries to improve retrieval quality.

Custom image generation

BaseImageGen has a single method generate. It saves the image to output_dir and returns an ImageResult with the file path and a server-relative URL.
from fim_one.core.image_gen.base import BaseImageGen, ImageResult


class StableDiffusionImageGen(BaseImageGen):
    async def generate(
        self,
        prompt: str,
        *,
        aspect_ratio: str = "1:1",
        output_dir: str,
    ) -> ImageResult:
        # Call your SD API, save to output_dir
        file_path = f"{output_dir}/image.png"
        return ImageResult(
            file_path=file_path,
            url=f"/uploads/{file_path.split('/')[-1]}",
            prompt=prompt,
            model="stable-diffusion-xl",
        )

Custom reranker

BaseReranker takes a query and a list of document strings and returns them reordered with scores.
from fim_one.core.reranker.base import BaseReranker, RerankResult


class CrossEncoderReranker(BaseReranker):
    async def rerank(
        self, query: str, documents: list[str], *, top_k: int = 5
    ) -> list[RerankResult]:
        # Score each (query, doc) pair with a cross-encoder
        scores = await self._score_pairs(query, documents)
        results = [
            RerankResult(index=i, score=score, text=doc)
            for i, (doc, score) in enumerate(zip(documents, scores))
        ]
        results.sort(key=lambda r: r.score, reverse=True)
        return results[:top_k]

Custom web backends

Web fetch

BaseWebFetch fetches a URL and returns its content as Markdown or plain text.
from fim_one.core.web.fetch.base import BaseWebFetch


class PlaywrightFetch(BaseWebFetch):
    async def fetch(self, url: str) -> str:
        # Use Playwright to render JS-heavy pages
        async with async_playwright() as p:
            browser = await p.chromium.launch()
            page = await browser.new_page()
            await page.goto(url)
            content = await page.content()
            await browser.close()
        return html_to_markdown(content)
BaseWebSearch returns a ranked list of SearchResult objects.
from fim_one.core.web.search.base import BaseWebSearch, SearchResult


class BingSearch(BaseWebSearch):
    async def search(self, query: str, *, num_results: int = 10) -> list[SearchResult]:
        # Call Bing Search API
        ...
        return [
            SearchResult(title=r["name"], url=r["url"], snippet=r["snippet"])
            for r in raw_results[:num_results]
        ]

Custom RAG components

The RAG pipeline has three independently swappable stages: loading, chunking, and retrieval.

Document loader

BaseLoader turns a file path into a list of LoadedDocument objects. PDF loaders typically return one document per page.
from pathlib import Path
from fim_one.rag.loaders.base import BaseLoader, LoadedDocument


class DocxLoader(BaseLoader):
    async def load(self, path: Path) -> list[LoadedDocument]:
        from docx import Document
        doc = Document(path)
        text = "\n".join(p.text for p in doc.paragraphs)
        return [LoadedDocument(content=text, metadata={"source": str(path)})]

Text chunker

BaseChunker splits text into Chunk objects. MAX_CHUNK_SIZE = 6000 characters is the hard ceiling — chunk sizes above this can overflow the Jina Embeddings v3 token window.
from typing import Any
from fim_one.rag.chunking.base import BaseChunker, Chunk


class SentenceChunker(BaseChunker):
    def __init__(self, sentences_per_chunk: int = 5) -> None:
        self._n = sentences_per_chunk

    async def chunk(self, text: str, metadata: dict[str, Any] | None = None) -> list[Chunk]:
        import nltk
        sentences = nltk.sent_tokenize(text)
        chunks = []
        for i in range(0, len(sentences), self._n):
            chunk_text = " ".join(sentences[i : i + self._n])
            chunks.append(Chunk(text=chunk_text, metadata=metadata or {}, index=i // self._n))
        return chunks

Retriever

BaseRetriever queries any backend and returns ranked Document objects.
from fim_one.rag.base import BaseRetriever, Document


class ElasticsearchRetriever(BaseRetriever):
    def __init__(self, es_client, index: str) -> None:
        self._es = es_client
        self._index = index

    async def retrieve(self, query: str, *, top_k: int = 5) -> list[Document]:
        resp = await self._es.search(
            index=self._index,
            query={"match": {"content": query}},
            size=top_k,
        )
        return [
            Document(
                content=hit["_source"]["content"],
                metadata=hit["_source"].get("metadata", {}),
                score=hit["_score"],
            )
            for hit in resp["hits"]["hits"]
        ]

Design principles

A few patterns are consistent across all base classes that make custom implementations easier to write correctly: Async-first. Every method is async def. Even if your implementation is synchronous, wrap it with asyncio.to_thread() rather than blocking the event loop. String output from tools. BaseTool.run() returns str (or ToolResult). The LLM only ever sees text — tool implementations are responsible for serializing complex data into a readable format. Minimal interfaces. Each base class defines the smallest contract needed. BaseMemory is three methods; BaseWebFetch is one. You are never required to implement functionality you don’t need. Composition over inheritance. The base classes are interfaces, not frameworks. You inject your implementation at construction time; the runtime never monkey-patches or subclasses it further.