11 KiB
11 KiB
FastAPI Celery Example
一个基于 FastAPI 和 Celery 的异步任务处理示例项目,展示了如何在现代 Python Web 应用中实现高效的异步任务队列。
项目介绍
本项目是一个完整的 FastAPI + Celery 应用示例,演示了如何:
- 使用 FastAPI 构建高性能的 REST API
- 集成 Celery 进行异步任务处理
- 使用 Redis 作为消息代理和结果后端
- 实现统一的日志管理系统
- 配置现代化的代码质量工具链
- 使用容器化技术简化部署
为什么选择这个架构?
- FastAPI: 现代化的异步 Web 框架,提供出色的性能和自动 API 文档生成
- Celery: 强大的分布式任务队列,支持多种消息代理
- Redis: 高速的内存数据库,适合作为任务队列的后端
- Loguru: 简单易用的日志库,支持结构化日志和文件轮转
- uv: 快速的 Python 包管理器,比 pip 更快
- Ruff: 快速的 Python 代码检查和格式化工具
- Docker: 容器化技术,确保环境一致性和易部署
Features
- FastAPI web framework
- Celery for asynchronous tasks
- Redis as message broker and result backend
- Loguru for unified logging
- Docker containerization
- Code quality with Ruff and pre-commit hooks
- Dependency management with uv
Project Structure
.
├── app/
│ ├── api/
│ │ ├── __init__.py
│ │ └── routes.py # API endpoints
│ ├── core/
│ │ ├── __init__.py
│ │ ├── celery_app.py # Celery configuration
│ │ ├── config.py # Application settings
│ │ └── logging.py # Loguru setup
│ └── tasks/
│ ├── __init__.py
│ └── tasks.py # Celery tasks
├── main.py # FastAPI application
├── pyproject.toml # Project dependencies and ruff config
├── Dockerfile # Docker image
├── docker-compose.yml # Multi-service setup
├── Makefile # Build and run commands
├── .pre-commit-config.yaml # Pre-commit hooks
└── README.md # This file
Installation
Prerequisites
- Python 3.11+
- uv (https://github.com/astral-sh/uv)
- Docker and Docker Compose (for containerized deployment)
Local Development
- Clone the repository:
git clone <repository-url>
cd fastapi-celery-example
- Install dependencies:
make install
- Install pre-commit hooks:
make pre-commit-install
架构说明
系统架构
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ FastAPI App │ │ Redis │ │ Celery Worker │
│ │◄──►│ Message Broker │◄──►│ │
│ • REST API │ │ Result Backend │ │ • Task Processing│
│ • Task Submission│ │ │ │ • Async Execution│
└─────────────────┘ └─────────────────┘ └─────────────────┘
工作流程
- 客户端请求: 用户通过 FastAPI 端点提交任务
- 任务入队: FastAPI 将任务发送到 Redis 消息队列
- 异步处理: Celery Worker 从队列中获取任务并异步执行
- 结果存储: 任务结果存储回 Redis
- 结果查询: 客户端可以通过任务 ID 查询执行结果
主要组件
- app/api/routes.py: FastAPI 路由定义,处理 HTTP 请求
- app/core/celery_app.py: Celery 应用配置
- app/core/logging.py: 统一日志配置
- app/tasks/tasks.py: 具体的任务实现
- app/core/config.py: 应用配置管理
用法指南
Local Development
- Start Redis (required for Celery):
docker run -d -p 6379:6379 redis:7-alpine
- Run the FastAPI server:
make dev
- In another terminal, run the Celery worker:
make worker
- Visit http://localhost:8000 to access the API
Docker Deployment
- Build and start all services:
make up
-
The API will be available at http://localhost:8000
-
Stop services:
make down
API 端点详解
基础端点
GET /- 应用根端点,返回欢迎信息
任务提交端点
-
POST /add- 功能: 提交两个整数的加法任务
- 请求体:
{"x": int, "y": int} - 响应:
{"task_id": "uuid", "status": "submitted", "message": "..."}
-
POST /long-task- 功能: 提交长时间运行的任务(用于演示异步处理)
- 请求体:
{"duration": int}(秒数) - 响应:
{"task_id": "uuid", "status": "submitted", "message": "..."}
结果查询端点
GET /result/{task_id}- 功能: 查询任务执行结果
- 响应状态:
pending: 任务正在执行success: 任务成功完成,返回结果failure: 任务执行失败,返回错误信息
详细使用示例
# Submit an addition task
curl -X POST "http://localhost:8000/add" \
-H "Content-Type: application/json" \
-d '{"x": 5, "y": 3}'
# Response: {"task_id": "uuid", "status": "submitted", "message": "Addition task submitted: 5 + 3"}
# Check result
curl "http://localhost:8000/result/uuid"
# Response: {"task_id": "uuid", "status": "success", "result": 8}
完整工作流程示例
# 1. 提交加法任务
curl -X POST "http://localhost:8000/add" \
-H "Content-Type: application/json" \
-d '{"x": 10, "y": 20}'
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "submitted", "message": "Addition task submitted: 10 + 20"}
# 2. 立即查询结果(可能还在处理中)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending"}
# 3. 稍后再次查询(任务完成)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "success", "result": 30}
# 4. 提交长时间任务
curl -X POST "http://localhost:8000/long-task" \
-H "Content-Type: application/json" \
-d '{"duration": 5}'
# 5. 在另一个终端查看 Celery worker 日志
# Worker 将处理任务并记录执行过程
错误处理
当任务执行失败时,结果查询会返回:
{
"task_id": "uuid",
"status": "failure",
"error": "具体错误信息"
}
任务状态说明
- PENDING: 任务已提交,正在等待处理
- PROGRESS: 任务正在执行中(如果任务支持进度报告)
- SUCCESS: 任务成功完成,结果可用
- FAILURE: 任务执行失败,包含错误信息
- RETRY: 任务失败后正在重试(如果配置了重试机制)
监控和调试
日志查看
应用会生成两种日志输出:
- 控制台日志: 实时显示应用运行状态
- 文件日志: 保存到
app.log文件,支持自动轮转
# 查看实时日志
tail -f app.log
# 查看 FastAPI 日志
make dev
# 查看 Celery worker 日志(新终端)
make worker
健康检查
# 检查 API 健康状态
curl http://localhost:8000/
# 检查 Redis 连接
docker exec -it <redis-container> redis-cli ping
常见问题排查
-
任务一直处于 PENDING 状态
- 检查 Celery worker 是否正在运行
- 确认 Redis 服务是否可访问
-
连接 Redis 失败
- 确保 Redis 容器正在运行
- 检查网络配置和端口映射
-
任务执行失败
- 查看 Celery worker 的错误日志
- 检查任务代码中的异常处理
性能监控
- 使用
/result/{task_id}端点监控任务执行时间 - 查看
app.log中的任务处理日志 - 监控 Redis 的内存和连接数
日志系统
All logs are written to both console and app.log file with rotation.
部署和扩展
生产环境部署
-
使用 Docker Compose:
make up -
水平扩展 Worker:
# 在 docker-compose.yml 中增加 worker 实例 worker: deploy: replicas: 3 # 根据负载调整 -
负载均衡:
- 使用 Nginx 或 Traefik 作为反向代理
- 配置多个 FastAPI 实例
扩展建议
-
添加任务优先级:
@celery_app.task(bind=True, priority=10) def high_priority_task(self): # 高优先级任务 pass -
实现任务进度跟踪:
from celery import current_task @celery_app.task(bind=True) def task_with_progress(self): for i in range(100): current_task.update_state(state='PROGRESS', meta={'progress': i}) # 执行任务逻辑 -
添加任务超时和重试:
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3}) def robust_task(self): # 任务逻辑 pass -
监控集成:
- 集成 Prometheus/Grafana 进行监控
- 使用 Celery Events 进行任务监控
配置管理
Environment variables can be set in a .env file:
REDIS_URL: Redis connection URL (default: redis://localhost:6379/0)CELERY_BROKER_URL: Celery broker URL (default: redis://localhost:6379/0)CELERY_RESULT_BACKEND: Celery result backend URL (default: redis://localhost:6379/0)
开发指南
添加新任务
-
在
app/tasks/tasks.py中定义新任务:@celery_app.task(bind=True) def your_new_task(self, param1, param2): log.info(f"Executing your_new_task with {param1}, {param2}") # 任务逻辑 return result -
在
app/api/routes.py中添加对应的 API 端点:@router.post("/your-task") async def submit_your_task(param1: str, param2: int): task = your_new_task.delay(param1, param2) return {"task_id": task.id, "status": "submitted"}
代码规范
运行代码质量检查:
make lint # 运行 ruff 代码检查
make format # 格式化代码
make pre-commit-run # 运行所有 pre-commit 检查
代码规范要求:
- 使用
uv run ruff check .检查代码质量 - 使用
uv run ruff format .格式化代码 - 提交前运行
make pre-commit-run确保所有检查通过 - 为所有新增功能添加适当的日志记录
测试建议
- 使用 pytest 编写单元测试
- 测试异步任务的提交和结果查询
- 验证错误处理和边界情况
- 使用测试 Redis 实例避免影响生产环境
贡献
欢迎提交 Issue 和 Pull Request!在提交前请确保:
- 代码通过所有 linting 和格式化检查
- 添加适当的测试用例
- 更新相关文档
- 遵循现有的代码风格和架构模式
License
MIT License