深入学习 Project N.E.K.O. 项目的后端开发,包括基于 FastAPI 的异步 Web 服务器、API 开发、AI 集成、记忆系统等内容。
Project N.E.K.O. 项目的后端采用多服务器架构:
main_server.py)
memory_server.py)
launcher.py)
💡 提示:关于服务器启动和端口配置,请参考 环境搭建指南 - 启动服务器。关于项目结构,请参考 项目架构与目录结构。
main_server.py # 主服务器入口
├── config/ # 配置管理
│ ├── api_providers.json
│ ├── prompts_chara.py
│ └── prompts_sys.py
├── main_helper/ # 核心对话模块
│ ├── core.py # 对话核心逻辑
│ ├── omni_realtime_client.py
│ ├── omni_offline_client.py
│ └── tts_helper.py
├── brain/ # AI 智能体模块
│ ├── analyzer.py
│ ├── planner.py
│ ├── processor.py
│ └── task_executor.py
├── memory/ # 记忆管理
│ ├── router.py
│ ├── semantic.py
│ └── recent.py
└── utils/ # 工具函数
├── config_manager.py
├── preferences.py
└── frontend_utils.py
# 使用 uv(推荐)
uv run python main_server.py
# 或激活虚拟环境后
python main_server.py
# 启动记忆服务器(需要单独终端)
uv run python memory_server.py
💡 提示:关于环境配置,请参考 环境搭建指南。
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.get("/api/endpoint")
async def your_handler():
# 异步处理逻辑
return {"message": "success"}
@app.post("/api/endpoint")
async def your_post_handler(request: Request):
data = await request.json()
# 处理逻辑
return {"message": "success", "data": data}
from fastapi import FastAPI
from datetime import datetime
@app.get("/api/health")
async def health_check():
"""健康检查端点"""
return {
"status": "ok",
"timestamp": datetime.now().isoformat()
}
from fastapi import FastAPI, Request
from pydantic import BaseModel
class UserCreate(BaseModel):
name: str
email: str
@app.post("/api/user")
async def create_user(user: UserCreate):
"""创建用户"""
# Pydantic 自动验证数据
# 处理逻辑
return {
"message": "User created",
"user_id": "123"
}
@app.get("/api/user/{user_id}")
async def get_user(user_id: int):
"""获取用户信息"""
# FastAPI 自动转换类型
return {"user_id": user_id}
@app.get("/api/items")
async def get_items(skip: int = 0, limit: int = 10):
"""获取项目列表"""
return {"skip": skip, "limit": limit}
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
email: EmailStr
age: int = Field(..., ge=0, le=150)
@app.post("/api/user")
async def create_user(user: UserCreate):
# 数据已自动验证
return {"message": "User created", "user": user}
from fastapi import HTTPException
@app.get("/api/user/{user_id}")
async def get_user(user_id: int):
if user_id < 1:
raise HTTPException(status_code=400, detail="Invalid user_id")
# 如果用户不存在
if not user_exists(user_id):
raise HTTPException(status_code=404, detail="User not found")
return {"user_id": user_id}
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": exc.errors(), "body": exc.body}
)
N.E.K.O 使用 WebSocket 进行实时对话,支持流式响应:
from fastapi import WebSocket, WebSocketDisconnect
from main_helper import core
@app.websocket("/ws/{lanlan_name}")
async def websocket_endpoint(websocket: WebSocket, lanlan_name: str):
"""WebSocket 对话端点"""
await websocket.accept()
try:
# 创建会话管理器
session_manager = core.LLMSessionManager(
sync_message_queue=None,
lanlan_name=lanlan_name,
lanlan_prompt="你的角色提示词"
)
while True:
# 接收消息
data = await websocket.receive_json()
message = data.get("message", "")
# 处理对话(异步)
response = await process_message_async(message, session_manager)
# 发送响应
await websocket.send_json({"response": response})
except WebSocketDisconnect:
# 清理资源
pass
from brain.analyzer import analyze_intent
from brain.planner import plan_task
from brain.processor import execute_task
from brain.task_executor import TaskExecutor
@app.post("/api/agent/task")
async def agent_task(request: Request):
"""处理智能体任务"""
data = await request.json()
user_request = data.get("request", "")
# 使用任务执行器
executor = TaskExecutor()
result = await executor.execute_task(user_request)
return {
"success": True,
"result": result
}
编辑 config/api_providers.json:
{
"core": {
"stepfun": {
"name": "阶跃星辰",
"base_url": "https://api.stepfun.com/v1",
"models": ["step-1-8k", "step-1-32k"]
},
"qwen": {
"name": "通义千问",
"base_url": "https://dashscope.aliyuncs.com/api/v1",
"models": ["qwen-turbo", "qwen-plus"]
}
}
}
💡 提示:关于 API 配置,请参考 N.E.K.O 项目的
config/api_providers.json文件。
记忆系统运行在独立的服务器上(memory_server.py),通过 HTTP 请求访问。
import httpx
async def store_memory(content: str, metadata: dict = None):
"""存储记忆到记忆服务器"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://localhost:48912/api/memory",
json={
"content": content,
"metadata": metadata or {}
}
)
return response.json()
# 在 API 端点中使用
@app.post("/api/memory")
async def store_user_memory(request: Request):
data = await request.json()
result = await store_memory(
content=data.get("content"),
metadata=data.get("metadata", {})
)
return result
async def search_memories(query: str, limit: int = 5):
"""从记忆服务器检索记忆"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"http://localhost:48912/api/memory/search",
json={
"query": query,
"limit": limit
}
)
return response.json()
@app.post("/api/memory/search")
async def search_memory(request: Request):
data = await request.json()
query = data.get("query", "")
results = await search_memories(query, limit=5)
return results
N.E.K.O 提供了 main_helper.cross_server 模块来简化跨服务器通信:
from main_helper import cross_server
# 存储记忆
memory_id = await cross_server.store_memory(
content="用户喜欢喝咖啡",
metadata={"type": "preference"}
)
# 检索记忆
memories = await cross_server.search_memories("用户偏好", limit=5)
import json
import os
def load_data(filepath):
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def save_data(filepath, data):
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 使用示例
data = load_data('data/users.json')
data['user_123'] = {'name': 'Alice'}
save_data('data/users.json', data)
import sqlite3
def get_db():
conn = sqlite3.connect('database.db')
conn.row_factory = sqlite3.Row
return conn
@app.route('/api/users', methods=['GET'])
def get_users():
db = get_db()
users = db.execute('SELECT * FROM users').fetchall()
db.close()
return jsonify([dict(user) for user in users])
N.E.K.O 使用统一的日志配置:
from utils.logger_config import setup_logging
import logging
# 设置日志
logger, log_config = setup_logging(
service_name="MainServer",
log_level=logging.INFO
)
@app.get("/api/test")
async def test():
logger.debug("Debug message")
logger.info("Info message")
logger.warning("Warning message")
logger.error("Error message")
return {"status": "ok"}
FastAPI 自动生成交互式 API 文档:
http://localhost:48911/docshttp://localhost:48911/redoc使用 curl 测试:
# GET 请求
curl http://localhost:48911/api/health
# POST 请求
curl -X POST http://localhost:48911/api/preferences \
-H "Content-Type: application/json" \
-d '{"model_path": "/path/to/model"}'
使用 Python httpx(异步):
import httpx
async def test_api():
async with httpx.AsyncClient() as client:
# GET 请求
response = await client.get("http://localhost:48911/api/health")
print(response.json())
# POST 请求
response = await client.post(
"http://localhost:48911/api/preferences",
json={"model_path": "/path/to/model"}
)
print(response.json())
使用 Python requests(同步):
import requests
response = requests.get("http://localhost:48911/api/health")
print(response.json())
response = requests.post(
"http://localhost:48911/api/preferences",
json={"model_path": "/path/to/model"}
)
print(response.json())
FastAPI 内置 CORS 支持:
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源(生产环境应指定具体域名)
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from fastapi import File, UploadFile
import os
UPLOAD_FOLDER = "uploads"
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif"}
def allowed_file(filename: str) -> bool:
return "." in filename and \
filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...)):
if not allowed_file(file.filename):
raise HTTPException(status_code=400, detail="Invalid file type")
# 保存文件
file_path = os.path.join(UPLOAD_FOLDER, file.filename)
with open(file_path, "wb") as f:
content = await file.read()
f.write(content)
return {
"message": "File uploaded",
"filename": file.filename
}
FastAPI 原生支持 WebSocket:
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收消息
data = await websocket.receive_json()
print(f"Received: {data}")
# 发送响应
await websocket.send_json({
"message": "Server received",
"data": data
})
except WebSocketDisconnect:
print("Client disconnected")
from fastapi.staticfiles import StaticFiles
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="static"), name="static")
from fastapi.templating import Jinja2Templates
from fastapi import Request
from fastapi.responses import HTMLResponse
templates = Jinja2Templates(directory="templates")
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
return templates.TemplateResponse(
"index.html",
{"request": request, "title": "N.E.K.O"}
)
from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_function(arg: str) -> str:
# 耗时操作
return arg.upper()
# 在异步函数中使用
@app.get("/api/cached")
async def cached_endpoint():
result = expensive_function("test")
return {"result": result}
FastAPI 原生支持异步,充分利用 async/await:
import asyncio
async def background_task(data: dict):
"""后台异步任务"""
await asyncio.sleep(1)
# 处理任务
pass
@app.post("/api/async")
async def async_task(request: Request):
data = await request.json()
# 启动后台任务(不等待完成)
asyncio.create_task(background_task(data))
return {"message": "Task started"}
如果使用数据库,建议使用连接池:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("sqlite+aiosqlite:///./database.db")
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
使用中间件压缩响应:
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
在开始后端开发之前,建议先阅读:
完成后端开发学习后,可以继续学习:
遇到问题? 在 GitHub Issues 提交你的问题,我们会及时解答!