前提环境准备:本地具备 docker python3 环境
以及深度阅读我们的 ollama 的文档
核心使用实现后端服务层的集成框架选型:Flask | sanic | Fastapi 随意选择
个人比较喜欢的是 sanic,其次fastapi,然后最后考虑 Flask 吧
为什么这里选择使用python作为后端服务层的开发呐!因为 ollama 官方文档中核心只能使用我们的 python 或者 nodejs
Ollma 创建项目
确保本地有
uv工具mkdir fastapi_ollama_appcd fastapi_ollama_app&&uv init

# project.toml 的默认配置吧
[project]
name = "fastapi-ollama-app"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = []
开始安装依赖吧
uv add fastapi[all]服务层的主要依赖,核心是依赖他进行书写对应的后端 api 接口吧这里的 fastapi 的标签你可以选择更多,为了快速上手,这里就安装 [all] feature 的吧,毕竟没有很多的个性化适配讷
uv add httpx异步的 http客户端,性能方面是优于 requests 的,用这个库进行取代requests 吧uv add python-dotenv类似于我们 nodejs 中的 process.env 或者 import.meta.env 或者 dotenv 工具吧,都是用于进行获取环境变量配置的,实现项目搭建的时候的可配置化实现吧uv add pydantic负责的是fastapi的数据层的东西吧,当然如果需要进行web层的实现,那就去看官网,有明确的依赖提示
到这里,基础的一些环境就搭建完毕了,想要集成其他的,那就继续调研吧,不嫌弃的话,opencv 也是可以集成的,哈哈哈
Ollama 目录搭建
这里来一个规范吧:src app 区分,一般对于前端来说项目的核心源代码是在 src 目录下,但是也是可以 app 目录下的讷
mkdir -p app/{api,core,utils,models}api 主要是负责的是服务暴露该外部使用的 api 接口的实现的目录吧
core 就是业务中的核心业务层吧,负责的是我们的业务数据处理的封装处理吧
models 负责的是定义数据类型和业务接口吧
utils 自定义项目工具吧
对于 python 而言
__init__.py的核心作用就是让每个目录作为单独的 package 来实现吧,核心是方便后续的模块引用吧
touch app/__init__.py
touch app/api/__init__.py
touch app/core/__init__.py
touch app/models/__init__.py
touch app/utils/__init__.py
touch app/main.py创建环境变量的管理
touch .env
OLLAMA_BASE_URL=http://localhost:11434
OLLAMA_TIMEOUT=3000 # 这里需要进行详细的调整,因为本地的 local model 的的确确有响应慢的问题,呃呃呃Ollama core 核心业务代码书写
读取环境变量
1. os 模块读取env配置
os.getenv("env配置字段", "默认值")或者os.environ["字段名"]当然选择有可以设置默认值的来进行操作了呀
2.
python_dotenv进行读取获取得到基本的配置吧,默认读取的是.env当然也是可以直接进行对应的指定读取目录的讷3.
pathlib明确路径进行读取吧4.
pydantic模型增强实现吧这里的踩坑的核心需要注意一点的是:注意python 的版本问题,我这里使用的是我们的 python3.11.14 版本吧
自定义 ollama 客户端
这里为什么可以实现定义讷:因为 docker 本地部署了 ollama 后,ollama 是给我们提供了很多的对接的接口的,可以直接使用网络请求的客户端工具进行实现集成吧,十分方便的讷,直接用就行了
python 提供的模拟网络请求客户端操作含有
httpx 库的使用
Ollama client
import httpx
import json
import time
from typing import List, Dict, Optional, Generator, AsyncGenerator, Union, Any
# 定义一些常用的类型别名,增强代码可读性
ModelInfo = Dict[str, Any]
GenerateResponse = Dict[str, Any]
ChatMessage = Dict[str, str]
EmbeddingVector = List[float]
class OllamaClient:
"""
一个全面的 Ollama API 客户端,使用 httpx 实现。
支持所有官方 API 并提供便捷的拓展功能。
"""
def __init__(self, base_url: str = "http://localhost:11434", timeout: float = 60.0):
self.base_url = base_url.rstrip('/')
self.timeout = timeout
# 创建可复用的同步和异步客户端
self._client = httpx.Client(timeout=self.timeout)
self._async_client = httpx.AsyncClient(timeout=self.timeout)
def __del__(self):
"""在对象销毁时关闭客户端连接池"""
if hasattr(self, '_client') and not self._client.is_closed:
self._client.close()
if hasattr(self, '_async_client') and not self._async_client.is_closed:
try:
# 异步关闭需要在事件循环中进行,如果循环已关闭,可能会报错
import asyncio
if asyncio.get_event_loop().is_running():
asyncio.create_task(self._async_client.aclose())
else:
# 如果事件循环已关闭,同步关闭
self._async_client.close()
except Exception:
self._async_client.close()
# --- 私有辅助方法 ---
def _handle_response(self, response: httpx.Response) -> Dict[str, Any]:
"""处理响应,检查状态码并解析JSON"""
try:
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
try:
error_data = e.response.json()
error_msg = error_data.get("error", str(e))
except Exception:
error_msg = str(e)
raise OllamaAPIError(f"API 请求失败: {error_msg}") from e
async def _async_handle_response(self, response: httpx.Response) -> Dict[str, Any]:
"""异步处理响应"""
try:
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
try:
error_data = e.response.json()
error_msg = error_data.get("error", str(e))
except Exception:
error_msg = str(e)
raise OllamaAPIError(f"API 请求失败: {error_msg}") from e
# --- 模型管理 API ---
def pull_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""
从 Ollama 库中拉取(下载)模型。
:param model_name: 模型名称,例如 "llama3:8b"
:param stream: 是否流式获取下载进度
:return: 如果 stream=True,返回一个生成器,否则返回最终状态
"""
url = f"{self.base_url}/api/pull"
payload = {"name": model_name, "stream": stream}
if not stream:
response = self._client.post(url, json=payload)
return self._handle_response(response)
else:
def stream_generator():
with self._client.stream("POST", url, json=payload) as response:
for line in response.iter_lines():
if line:
yield json.loads(line)
return stream_generator()
def push_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], Generator[Dict[str, Any], None, None]]:
"""
将本地模型推送到 Ollama 库。
:param model_name: 模型名称
:param stream: 是否流式获取上传进度
:return: 如果 stream=True,返回一个生成器,否则返回最终状态
"""
url = f"{self.base_url}/api/push"
payload = {"name": model_name, "stream": stream}
if not stream:
response = self._client.post(url, json=payload)
return self._handle_response(response)
else:
def stream_generator():
with self._client.stream("POST", url, json=payload) as response:
for line in response.iter_lines():
if line:
yield json.loads(line)
return stream_generator()
def list_models(self) -> List[ModelInfo]:
"""
列出所有已安装的本地模型。
:return: 模型信息列表
"""
url = f"{self.base_url}/api/tags"
response = self._client.get(url)
data = self._handle_response(response)
return data.get("models", [])
def show_model(self, model_name: str) -> ModelInfo:
"""
获取特定模型的详细信息。
:param model_name: 模型名称
:return: 模型详细信息
"""
url = f"{self.base_url}/api/show"
payload = {"name": model_name}
response = self._client.post(url, json=payload)
return self._handle_response(response)
def delete_model(self, model_name: str) -> Dict[str, Any]:
"""
删除本地模型。
:param model_name: 模型名称
:return: 操作结果
"""
url = f"{self.base_url}/api/delete"
payload = {"name": model_name}
response = self._client.delete(url, json=payload)
return self._handle_response(response)
# --- 生成与聊天 API ---
def generate(
self,
model: str,
prompt: str,
system: Optional[str] = None,
template: Optional[str] = None,
context: Optional[List[int]] = None,
stream: bool = False,
raw: bool = False,
options: Optional[Dict[str, Any]] = None,
) -> Union[GenerateResponse, Generator[GenerateResponse, None, None]]:
"""
根据提示生成文本。
:param model: 模型名称
:param prompt: 用户提示
:param system: 系统提示,用于指导模型行为
:param template: 自定义提示模板
:param context: 来自先前生成的上下文(用于多轮对话)
:param stream: 是否流式返回结果
:param raw: 是否返回原始的、未格式化的响应
:param options: 模型特定的生成选项,如 temperature, top_p 等
:return: 如果 stream=True,返回一个生成器,否则返回完整响应
"""
url = f"{self.base_url}/api/generate"
payload: Dict[str, Any] = {
"model": model,
"prompt": prompt,
"stream": stream,
"raw": raw,
}
if system: payload["system"] = system
if template: payload["template"] = template
if context: payload["context"] = context
if options: payload["options"] = options
if not stream:
response = self._client.post(url, json=payload)
return self._handle_response(response)
else:
def stream_generator():
with self._client.stream("POST", url, json=payload) as response:
for line in response.iter_lines():
if line:
yield json.loads(line)
return stream_generator()
def chat(
self,
model: str,
messages: List[ChatMessage],
stream: bool = False,
options: Optional[Dict[str, Any]] = None,
) -> Union[GenerateResponse, Generator[GenerateResponse, None, None]]:
"""
进行聊天对话。
:param model: 模型名称
:param messages: 消息历史列表,每个消息包含 "role" 和 "content"
:param stream: 是否流式返回结果
:param options: 模型特定的生成选项
:return: 如果 stream=True,返回一个生成器,否则返回完整响应
"""
url = f"{self.base_url}/api/chat"
payload: Dict[str, Any] = {
"model": model,
"messages": messages,
"stream": stream,
}
if options: payload["options"] = options
if not stream:
response = self._client.post(url, json=payload)
return self._handle_response(response)
else:
def stream_generator():
with self._client.stream("POST", url, json=payload) as response:
for line in response.iter_lines():
if line:
yield json.loads(line)
return stream_generator()
def create_embedding(self, model: str, prompt: str) -> EmbeddingVector:
"""
为给定的文本生成嵌入向量。
:param model: 模型名称(需支持嵌入功能)
:param prompt: 输入文本
:return: 嵌入向量
"""
url = f"{self.base_url}/api/embeddings"
payload = {"model": model, "prompt": prompt}
response = self._client.post(url, json=payload)
data = self._handle_response(response)
return data.get("embedding", [])
# --- 服务健康检查 ---
def is_healthy(self) -> bool:
"""
检查 Ollama 服务是否正在运行。
:return: 如果服务正常,返回 True,否则返回 False
"""
try:
response = self._client.get(f"{self.base_url}/")
return response.status_code == 200 and "Ollama is running" in response.text
except Exception:
return False
# --- 拓展功能 ---
def wait_for_service(self, timeout: float = 30.0) -> bool:
"""
等待 Ollama 服务启动。
:param timeout: 超时时间(秒)
:return: 如果在超时前服务启动,返回 True,否则返回 False
"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.is_healthy():
return True
time.sleep(1)
return False
def find_model(self, model_name: str) -> Optional[ModelInfo]:
"""
在已安装的模型中查找特定模型。
:param model_name: 模型名称(可以是模糊匹配,如 "llama3")
:return: 如果找到,返回模型信息,否则返回 None
"""
models = self.list_models()
for model in models:
if model_name in model.get("name", ""):
return model
return None
# --- 异步 API ---
async def async_pull_model(self, model_name: str, stream: bool = False) -> Union[Dict[str, Any], AsyncGenerator[Dict[str, Any], None]]:
url = f"{self.base_url}/api/pull"
payload = {"name": model_name, "stream": stream}
if not stream:
response = await self._async_client.post(url, json=payload)
return await self._async_handle_response(response)
else:
async def stream_generator():
async with self._async_client.stream("POST", url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield json.loads(line)
return stream_generator()
async def async_list_models(self) -> List[ModelInfo]:
url = f"{self.base_url}/api/tags"
response = await self._async_client.get(url)
data = await self._async_handle_response(response)
return data.get("models", [])
async def async_generate(self, model: str, prompt: str, **kwargs) -> GenerateResponse:
url = f"{self.base_url}/api/generate"
kwargs.update({"model": model, "prompt": prompt, "stream": False})
response = await self._async_client.post(url, json=kwargs)
return await self._async_handle_response(response)
async def async_chat(self, model: str, messages: List[ChatMessage], **kwargs) -> GenerateResponse:
url = f"{self.base_url}/api/chat"
kwargs.update({"model": model, "messages": messages, "stream": False})
response = await self._async_client.post(url, json=kwargs)
return await self._async_handle_response(response)
async def async_create_embedding(self, model: str, prompt: str) -> EmbeddingVector:
url = f"{self.base_url}/api/embeddings"
payload = {"model": model, "prompt": prompt}
response = await self._async_client.post(url, json=payload)
data = await self._async_handle_response(response)
return data.get("embedding", [])
async def async_is_healthy(self) -> bool:
try:
response = await self._async_client.get(f"{self.base_url}/")
return response.status_code == 200 and "Ollama is running" in response.text
except Exception:
return False
class OllamaAPIError(Exception):
"""Ollama API 调用异常"""
pass