• 前提环境准备:本地具备 docker python3 环境

  • 以及深度阅读我们的 ollama 的文档

  • 核心使用实现后端服务层的集成框架选型:Flask | sanic | Fastapi 随意选择

    • 个人比较喜欢的是 sanic,其次fastapi,然后最后考虑 Flask 吧

  • 为什么这里选择使用python作为后端服务层的开发呐!因为 ollama 官方文档中核心只能使用我们的 python 或者 nodejs

Ollma 创建项目

  • 确保本地有 uv 工具

    • mkdir fastapi_ollama_app

    • cd fastapi_ollama_app && uv init

# project.toml 的默认配置吧
[project]
name = "fastapi-ollama-app"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = []
  • 开始安装依赖吧

    • uv add fastapi[all] 服务层的主要依赖,核心是依赖他进行书写对应的后端 api 接口吧

      • 这里的 fastapi 的标签你可以选择更多,为了快速上手,这里就安装 [all] feature 的吧,毕竟没有很多的个性化适配讷

    • uv add httpx 异步的 http客户端,性能方面是优于 requests 的,用这个库进行取代requests 吧

    • uv add python-dotenv 类似于我们 nodejs 中的 process.env 或者 import.meta.env 或者 dotenv 工具吧,都是用于进行获取环境变量配置的,实现项目搭建的时候的可配置化实现吧

    • uv add pydantic 负责的是fastapi的数据层的东西吧,当然如果需要进行web层的实现,那就去看官网,有明确的依赖提示

  • 到这里,基础的一些环境就搭建完毕了,想要集成其他的,那就继续调研吧,不嫌弃的话,opencv 也是可以集成的,哈哈哈

Ollama 目录搭建

  • 这里来一个规范吧:src app 区分,一般对于前端来说项目的核心源代码是在 src 目录下,但是也是可以 app 目录下的讷

    • mkdir -p app/{api,core,utils,models}

      • api 主要是负责的是服务暴露该外部使用的 api 接口的实现的目录吧

      • core 就是业务中的核心业务层吧,负责的是我们的业务数据处理的封装处理吧

      • models 负责的是定义数据类型和业务接口吧

      • utils 自定义项目工具吧

    • 对于 python 而言__init__.py 的核心作用就是让每个目录作为单独的 package 来实现吧,核心是方便后续的模块引用吧

touch app/__init__.py
touch app/api/__init__.py
touch app/core/__init__.py
touch app/models/__init__.py
touch app/utils/__init__.py
touch app/main.py
  • 创建环境变量的管理

    • touch .env

OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_TIMEOUT=3000  # 这里需要进行详细的调整,因为本地的 local model 的的确确有响应慢的问题,呃呃呃

Ollama core 核心业务代码书写

  • 读取环境变量

    • 1. os 模块读取env配置

      • os.getenv("env配置字段", "默认值") 或者 os.environ["字段名"]

      • 当然选择有可以设置默认值的来进行操作了呀

    • 2. python_dotenv 进行读取获取得到基本的配置吧,默认读取的是 .env 当然也是可以直接进行对应的指定读取目录的讷

    • 3. pathlib 明确路径进行读取吧

    • 4. pydantic 模型增强实现吧

    • 这里的踩坑的核心需要注意一点的是:注意python 的版本问题,我这里使用的是我们的 python3.11.14 版本吧

  • 自定义 ollama 客户端

    • 这里为什么可以实现定义讷:因为 docker 本地部署了 ollama 后,ollama 是给我们提供了很多的对接的接口的,可以直接使用网络请求的客户端工具进行实现集成吧,十分方便的讷,直接用就行了

    • python 提供的模拟网络请求客户端操作含有

      • httpx 库的使用

API 端点

HTTP 方法

主要功能

请求参数示例

响应示例

/api/tags

GET

获取本地模型列表

{"models": [{"name": "llama3.1:8b", "size": 4683087332, ...}]}

/api/generate

POST

流式文本生成

{"model": "llama3.1:8b", "prompt": "Hello", "stream": true}

流式返回:{"model": "...", "response": "Hi", "done": false}

/api/chat

POST

多轮对话

{"model": "llama3.1:8b", "messages": [{"role": "user", "content": "Hi"}]}

{"message": {"role": "assistant", "content": "Hello!"}}

/api/pull

POST

拉取模型

{"model": "llama3.1:8b", "stream": true}

流式返回下载进度

/api/delete

DELETE

删除模型

{"model": "llama3.1:8b"}

{"status": "success"}

/api/copy

POST

复制模型

{"source": "llama3.1:8b", "destination": "my-llama"}

{"status": "success"}

/api/show

POST

显示模型信息

{"model": "llama3.1:8b"}

{"license": "...", "template": "...", "parameters": ...}

/api/ps

GET

查看运行中的模型

{"models": [{"name": "llama3.1:8b", "size": "...", "digest": "..."}]}

/api/embeddings

POST

获取文本嵌入

{"model": "llama3.1:8b", "prompt": "text"}

{"embedding": [0.1, 0.2, ...]}

Ollama client

import httpx
import json
import time
from typing import List, Dict, Optional, Generator, AsyncGenerator, Union, Any

# 定义一些常用的类型别名,增强代码可读性
ModelInfo = Dict[str, Any]
GenerateResponse = Dict[str, Any]
ChatMessage = Dict[str, str]
EmbeddingVector = List[float]

class OllamaClient:
    """
    一个全面的 Ollama API 客户端,使用 httpx 实现。
    支持所有官方 API 并提供便捷的拓展功能。
    """

    def __init__(self, base_url: str = "http://localhost:11434", timeout: float = 60.0):
        self.base_url = base_url.rstrip('/')
        self.timeout = timeout
        # 创建可复用的同步和异步客户端
        self._client = httpx.Client(timeout=self.timeout)
        self._async_client = httpx.AsyncClient(timeout=self.timeout)

    def __del__(self):
        """在对象销毁时关闭客户端连接池"""
        if hasattr(self, '_client') and not self._client.is_closed:
            self._client.close()
        if hasattr(self, '_async_client') and not self._async_client.is_closed:
            try:
                # 异步关闭需要在事件循环中进行,如果循环已关闭,可能会报错
                import asyncio
                if asyncio.get_event_loop().is_running():
                    asyncio.create_task(self._async_client.aclose())
                else:
                    # 如果事件循环已关闭,同步关闭
                    self._async_client.close()
            except Exception:
                self._async_client.close()

    # --- 私有辅助方法 ---

    def _handle_response(self, response: httpx.Response) -> Dict[str, Any]:
        """处理响应,检查状态码并解析JSON"""
        try:
            response.raise_for_status()
            return response.json()
        except httpx.HTTPError as e:
            try:
                error_data = e.response.json()
                error_msg = error_data.get("error", str(e))
            except Exception:
                error_msg = str(e)
            raise OllamaAPIError(f"API 请求失败: {error_msg}") from e

    async def _async_handle_response(self, response: httpx.Response) -> Dict[str, Any]:
        """异步处理响应"""
        try:
            response.raise_for_status()
            return response.json()
        except httpx.HTTPError as e:
            try:
                error_data = e.response.json()
                error_msg = error_data.get("error", str(e))
            except Exception:
                error_msg = str(e)
            raise OllamaAPIError(f"API 请求失败: {error_msg}") from e

    # --- 模型管理 API ---

    def pull_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
        """
        从 Ollama 库中拉取(下载)模型。
        :param model_name: 模型名称,例如 "llama3:8b"
        :param stream: 是否流式获取下载进度
        :return: 如果 stream=True,返回一个生成器,否则返回最终状态
        """
        url = f"{self.base_url}/api/pull"
        payload = {"name": model_name, "stream": stream}

        if not stream:
            response = self._client.post(url, json=payload)
            return self._handle_response(response)
        else:
            def stream_generator():
                with self._client.stream("POST", url, json=payload) as response:
                    for line in response.iter_lines():
                        if line:
                            yield json.loads(line)
            return stream_generator()

    def push_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
        """
        将本地模型推送到 Ollama 库。
        :param model_name: 模型名称
        :param stream: 是否流式获取上传进度
        :return: 如果 stream=True,返回一个生成器,否则返回最终状态
        """
        url = f"{self.base_url}/api/push"
        payload = {"name": model_name, "stream": stream}

        if not stream:
            response = self._client.post(url, json=payload)
            return self._handle_response(response)
        else:
            def stream_generator():
                with self._client.stream("POST", url, json=payload) as response:
                    for line in response.iter_lines():
                        if line:
                            yield json.loads(line)
            return stream_generator()

    def list_models(self) -> List[ModelInfo]:
        """
        列出所有已安装的本地模型。
        :return: 模型信息列表
        """
        url = f"{self.base_url}/api/tags"
        response = self._client.get(url)
        data = self._handle_response(response)
        return data.get("models", [])

    def show_model(self, model_name: str) -> ModelInfo:
        """
        获取特定模型的详细信息。
        :param model_name: 模型名称
        :return: 模型详细信息
        """
        url = f"{self.base_url}/api/show"
        payload = {"name": model_name}
        response = self._client.post(url, json=payload)
        return self._handle_response(response)

    def delete_model(self, model_name: str) -> Dict[str, Any]:
        """
        删除本地模型。
        :param model_name: 模型名称
        :return: 操作结果
        """
        url = f"{self.base_url}/api/delete"
        payload = {"name": model_name}
        response = self._client.delete(url, json=payload)
        return self._handle_response(response)

    # --- 生成与聊天 API ---

    def generate(
        self,
        model: str,
        prompt: str,
        system: Optional[str] = None,
        template: Optional[str] = None,
        context: Optional[List[int]] = None,
        stream: bool = False,
        raw: bool = False,
        options: Optional[Dict[str, Any]] = None,
    ) -> Union[GenerateResponse, Generator[GenerateResponse, None, None]]:
        """
        根据提示生成文本。
        :param model: 模型名称
        :param prompt: 用户提示
        :param system: 系统提示,用于指导模型行为
        :param template: 自定义提示模板
        :param context: 来自先前生成的上下文(用于多轮对话)
        :param stream: 是否流式返回结果
        :param raw: 是否返回原始的、未格式化的响应
        :param options: 模型特定的生成选项,如 temperature, top_p 等
        :return: 如果 stream=True,返回一个生成器,否则返回完整响应
        """
        url = f"{self.base_url}/api/generate"
        payload: Dict[str, Any] = {
            "model": model,
            "prompt": prompt,
            "stream": stream,
            "raw": raw,
        }
        if system: payload["system"] = system
        if template: payload["template"] = template
        if context: payload["context"] = context
        if options: payload["options"] = options

        if not stream:
            response = self._client.post(url, json=payload)
            return self._handle_response(response)
        else:
            def stream_generator():
                with self._client.stream("POST", url, json=payload) as response:
                    for line in response.iter_lines():
                        if line:
                            yield json.loads(line)
            return stream_generator()

    def chat(
        self,
        model: str,
        messages: List[ChatMessage],
        stream: bool = False,
        options: Optional[Dict[str, Any]] = None,
    ) -> Union[GenerateResponse, Generator[GenerateResponse, None, None]]:
        """
        进行聊天对话。
        :param model: 模型名称
        :param messages: 消息历史列表,每个消息包含 "role" 和 "content"
        :param stream: 是否流式返回结果
        :param options: 模型特定的生成选项
        :return: 如果 stream=True,返回一个生成器,否则返回完整响应
        """
        url = f"{self.base_url}/api/chat"
        payload: Dict[str, Any] = {
            "model": model,
            "messages": messages,
            "stream": stream,
        }
        if options: payload["options"] = options

        if not stream:
            response = self._client.post(url, json=payload)
            return self._handle_response(response)
        else:
            def stream_generator():
                with self._client.stream("POST", url, json=payload) as response:
                    for line in response.iter_lines():
                        if line:
                            yield json.loads(line)
            return stream_generator()

    def create_embedding(self, model: str, prompt: str) -> EmbeddingVector:
        """
        为给定的文本生成嵌入向量。
        :param model: 模型名称(需支持嵌入功能)
        :param prompt: 输入文本
        :return: 嵌入向量
        """
        url = f"{self.base_url}/api/embeddings"
        payload = {"model": model, "prompt": prompt}
        response = self._client.post(url, json=payload)
        data = self._handle_response(response)
        return data.get("embedding", [])

    # --- 服务健康检查 ---

    def is_healthy(self) -> bool:
        """
        检查 Ollama 服务是否正在运行。
        :return: 如果服务正常,返回 True,否则返回 False
        """
        try:
            response = self._client.get(f"{self.base_url}/")
            return response.status_code == 200 and "Ollama is running" in response.text
        except Exception:
            return False

    # --- 拓展功能 ---

    def wait_for_service(self, timeout: float = 30.0) -> bool:
        """
        等待 Ollama 服务启动。
        :param timeout: 超时时间(秒)
        :return: 如果在超时前服务启动,返回 True,否则返回 False
        """
        start_time = time.time()
        while time.time() - start_time < timeout:
            if self.is_healthy():
                return True
            time.sleep(1)
        return False

    def find_model(self, model_name: str) -> Optional[ModelInfo]:
        """
        在已安装的模型中查找特定模型。
        :param model_name: 模型名称(可以是模糊匹配,如 "llama3")
        :return: 如果找到,返回模型信息,否则返回 None
        """
        models = self.list_models()
        for model in models:
            if model_name in model.get("name", ""):
                return model
        return None

    # --- 异步 API ---

    async def async_pull_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], AsyncGenerator[Dict[str, Any], None]]:
        url = f"{self.base_url}/api/pull"
        payload = {"name": model_name, "stream": stream}
        if not stream:
            response = await self._async_client.post(url, json=payload)
            return await self._async_handle_response(response)
        else:
            async def stream_generator():
                async with self._async_client.stream("POST", url, json=payload) as response:
                    async for line in response.aiter_lines():
                        if line:
                            yield json.loads(line)
            return stream_generator()

    async def async_list_models(self) -> List[ModelInfo]:
        url = f"{self.base_url}/api/tags"
        response = await self._async_client.get(url)
        data = await self._async_handle_response(response)
        return data.get("models", [])

    async def async_generate(self, model: str, prompt: str, **kwargs) -> GenerateResponse:
        url = f"{self.base_url}/api/generate"
        kwargs.update({"model": model, "prompt": prompt, "stream": False})
        response = await self._async_client.post(url, json=kwargs)
        return await self._async_handle_response(response)
        
    async def async_chat(self, model: str, messages: List[ChatMessage], **kwargs) -> GenerateResponse:
        url = f"{self.base_url}/api/chat"
        kwargs.update({"model": model, "messages": messages, "stream": False})
        response = await self._async_client.post(url, json=kwargs)
        return await self._async_handle_response(response)

    async def async_create_embedding(self, model: str, prompt: str) -> EmbeddingVector:
        url = f"{self.base_url}/api/embeddings"
        payload = {"model": model, "prompt": prompt}
        response = await self._async_client.post(url, json=payload)
        data = await self._async_handle_response(response)
        return data.get("embedding", [])

    async def async_is_healthy(self) -> bool:
        try:
            response = await self._async_client.get(f"{self.base_url}/")
            return response.status_code == 200 and "Ollama is running" in response.text
        except Exception:
            return False

class OllamaAPIError(Exception):
    """Ollama API 调用异常"""
    pass