411 lines
11 KiB
Markdown
411 lines
11 KiB
Markdown
# FastAPI Celery Example
|
||
|
||
一个基于 FastAPI 和 Celery 的异步任务处理示例项目,展示了如何在现代 Python Web 应用中实现高效的异步任务队列。
|
||
|
||
## 项目介绍
|
||
|
||
本项目是一个完整的 FastAPI + Celery 应用示例,演示了如何:
|
||
|
||
- 使用 FastAPI 构建高性能的 REST API
|
||
- 集成 Celery 进行异步任务处理
|
||
- 使用 Redis 作为消息代理和结果后端
|
||
- 实现统一的日志管理系统
|
||
- 配置现代化的代码质量工具链
|
||
- 使用容器化技术简化部署
|
||
|
||
### 为什么选择这个架构?
|
||
|
||
- **FastAPI**: 现代化的异步 Web 框架,提供出色的性能和自动 API 文档生成
|
||
- **Celery**: 强大的分布式任务队列,支持多种消息代理
|
||
- **Redis**: 高速的内存数据库,适合作为任务队列的后端
|
||
- **Loguru**: 简单易用的日志库,支持结构化日志和文件轮转
|
||
- **uv**: 快速的 Python 包管理器,比 pip 更快
|
||
- **Ruff**: 快速的 Python 代码检查和格式化工具
|
||
- **Docker**: 容器化技术,确保环境一致性和易部署
|
||
|
||
## Features
|
||
|
||
- FastAPI web framework
|
||
- Celery for asynchronous tasks
|
||
- Redis as message broker and result backend
|
||
- Loguru for unified logging
|
||
- Docker containerization
|
||
- Code quality with Ruff and pre-commit hooks
|
||
- Dependency management with uv
|
||
|
||
## Project Structure
|
||
|
||
```
|
||
.
|
||
├── app/
|
||
│ ├── api/
|
||
│ │ ├── __init__.py
|
||
│ │ └── routes.py # API endpoints
|
||
│ ├── core/
|
||
│ │ ├── __init__.py
|
||
│ │ ├── celery_app.py # Celery configuration
|
||
│ │ ├── config.py # Application settings
|
||
│ │ └── logging.py # Loguru setup
|
||
│ └── tasks/
|
||
│ ├── __init__.py
|
||
│ └── tasks.py # Celery tasks
|
||
├── main.py # FastAPI application
|
||
├── pyproject.toml # Project dependencies and ruff config
|
||
├── Dockerfile # Docker image
|
||
├── docker-compose.yml # Multi-service setup
|
||
├── Makefile # Build and run commands
|
||
├── .pre-commit-config.yaml # Pre-commit hooks
|
||
└── README.md # This file
|
||
```
|
||
|
||
## Installation
|
||
|
||
### Prerequisites
|
||
|
||
- Python 3.11+
|
||
- uv (https://github.com/astral-sh/uv)
|
||
- Docker and Docker Compose (for containerized deployment)
|
||
|
||
### Local Development
|
||
|
||
1. Clone the repository:
|
||
```bash
|
||
git clone <repository-url>
|
||
cd fastapi-celery-example
|
||
```
|
||
|
||
2. Install dependencies:
|
||
```bash
|
||
make install
|
||
```
|
||
|
||
3. Install pre-commit hooks:
|
||
```bash
|
||
make pre-commit-install
|
||
```
|
||
|
||
## 架构说明
|
||
|
||
### 系统架构
|
||
|
||
```
|
||
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
|
||
│ FastAPI App │ │ Redis │ │ Celery Worker │
|
||
│ │◄──►│ Message Broker │◄──►│ │
|
||
│ • REST API │ │ Result Backend │ │ • Task Processing│
|
||
│ • Task Submission│ │ │ │ • Async Execution│
|
||
└─────────────────┘ └─────────────────┘ └─────────────────┘
|
||
```
|
||
|
||
### 工作流程
|
||
|
||
1. **客户端请求**: 用户通过 FastAPI 端点提交任务
|
||
2. **任务入队**: FastAPI 将任务发送到 Redis 消息队列
|
||
3. **异步处理**: Celery Worker 从队列中获取任务并异步执行
|
||
4. **结果存储**: 任务结果存储回 Redis
|
||
5. **结果查询**: 客户端可以通过任务 ID 查询执行结果
|
||
|
||
### 主要组件
|
||
|
||
- **app/api/routes.py**: FastAPI 路由定义,处理 HTTP 请求
|
||
- **app/core/celery_app.py**: Celery 应用配置
|
||
- **app/core/logging.py**: 统一日志配置
|
||
- **app/tasks/tasks.py**: 具体的任务实现
|
||
- **app/core/config.py**: 应用配置管理
|
||
|
||
## 用法指南
|
||
|
||
### Local Development
|
||
|
||
1. Start Redis (required for Celery):
|
||
```bash
|
||
docker run -d -p 6379:6379 redis:7-alpine
|
||
```
|
||
|
||
2. Run the FastAPI server:
|
||
```bash
|
||
make dev
|
||
```
|
||
|
||
3. In another terminal, run the Celery worker:
|
||
```bash
|
||
make worker
|
||
```
|
||
|
||
4. Visit http://localhost:8000 to access the API
|
||
|
||
### Docker Deployment
|
||
|
||
1. Build and start all services:
|
||
```bash
|
||
make up
|
||
```
|
||
|
||
2. The API will be available at http://localhost:8000
|
||
|
||
3. Stop services:
|
||
```bash
|
||
make down
|
||
```
|
||
|
||
## API 端点详解
|
||
|
||
### 基础端点
|
||
|
||
- `GET /` - 应用根端点,返回欢迎信息
|
||
|
||
### 任务提交端点
|
||
|
||
- `POST /add`
|
||
- **功能**: 提交两个整数的加法任务
|
||
- **请求体**: `{"x": int, "y": int}`
|
||
- **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}`
|
||
|
||
- `POST /long-task`
|
||
- **功能**: 提交长时间运行的任务(用于演示异步处理)
|
||
- **请求体**: `{"duration": int}` (秒数)
|
||
- **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}`
|
||
|
||
### 结果查询端点
|
||
|
||
- `GET /result/{task_id}`
|
||
- **功能**: 查询任务执行结果
|
||
- **响应状态**:
|
||
- `pending`: 任务正在执行
|
||
- `success`: 任务成功完成,返回结果
|
||
- `failure`: 任务执行失败,返回错误信息
|
||
|
||
## 详细使用示例
|
||
|
||
```bash
|
||
# Submit an addition task
|
||
curl -X POST "http://localhost:8000/add" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"x": 5, "y": 3}'
|
||
|
||
# Response: {"task_id": "uuid", "status": "submitted", "message": "Addition task submitted: 5 + 3"}
|
||
|
||
# Check result
|
||
curl "http://localhost:8000/result/uuid"
|
||
|
||
# Response: {"task_id": "uuid", "status": "success", "result": 8}
|
||
```
|
||
|
||
### 完整工作流程示例
|
||
|
||
```bash
|
||
# 1. 提交加法任务
|
||
curl -X POST "http://localhost:8000/add" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"x": 10, "y": 20}'
|
||
|
||
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "submitted", "message": "Addition task submitted: 10 + 20"}
|
||
|
||
# 2. 立即查询结果(可能还在处理中)
|
||
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
|
||
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending"}
|
||
|
||
# 3. 稍后再次查询(任务完成)
|
||
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
|
||
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "success", "result": 30}
|
||
|
||
# 4. 提交长时间任务
|
||
curl -X POST "http://localhost:8000/long-task" \
|
||
-H "Content-Type: application/json" \
|
||
-d '{"duration": 5}'
|
||
|
||
# 5. 在另一个终端查看 Celery worker 日志
|
||
# Worker 将处理任务并记录执行过程
|
||
```
|
||
|
||
### 错误处理
|
||
|
||
当任务执行失败时,结果查询会返回:
|
||
|
||
```json
|
||
{
|
||
"task_id": "uuid",
|
||
"status": "failure",
|
||
"error": "具体错误信息"
|
||
}
|
||
```
|
||
|
||
### 任务状态说明
|
||
|
||
- **PENDING**: 任务已提交,正在等待处理
|
||
- **PROGRESS**: 任务正在执行中(如果任务支持进度报告)
|
||
- **SUCCESS**: 任务成功完成,结果可用
|
||
- **FAILURE**: 任务执行失败,包含错误信息
|
||
- **RETRY**: 任务失败后正在重试(如果配置了重试机制)
|
||
|
||
## 监控和调试
|
||
|
||
### 日志查看
|
||
|
||
应用会生成两种日志输出:
|
||
|
||
1. **控制台日志**: 实时显示应用运行状态
|
||
2. **文件日志**: 保存到 `app.log` 文件,支持自动轮转
|
||
|
||
```bash
|
||
# 查看实时日志
|
||
tail -f app.log
|
||
|
||
# 查看 FastAPI 日志
|
||
make dev
|
||
|
||
# 查看 Celery worker 日志(新终端)
|
||
make worker
|
||
```
|
||
|
||
### 健康检查
|
||
|
||
```bash
|
||
# 检查 API 健康状态
|
||
curl http://localhost:8000/
|
||
|
||
# 检查 Redis 连接
|
||
docker exec -it <redis-container> redis-cli ping
|
||
```
|
||
|
||
### 常见问题排查
|
||
|
||
1. **任务一直处于 PENDING 状态**
|
||
- 检查 Celery worker 是否正在运行
|
||
- 确认 Redis 服务是否可访问
|
||
|
||
2. **连接 Redis 失败**
|
||
- 确保 Redis 容器正在运行
|
||
- 检查网络配置和端口映射
|
||
|
||
3. **任务执行失败**
|
||
- 查看 Celery worker 的错误日志
|
||
- 检查任务代码中的异常处理
|
||
|
||
### 性能监控
|
||
|
||
- 使用 `/result/{task_id}` 端点监控任务执行时间
|
||
- 查看 `app.log` 中的任务处理日志
|
||
- 监控 Redis 的内存和连接数
|
||
|
||
## 日志系统
|
||
|
||
All logs are written to both console and `app.log` file with rotation.
|
||
|
||
## 部署和扩展
|
||
|
||
### 生产环境部署
|
||
|
||
1. **使用 Docker Compose**:
|
||
```bash
|
||
make up
|
||
```
|
||
|
||
2. **水平扩展 Worker**:
|
||
```yaml
|
||
# 在 docker-compose.yml 中增加 worker 实例
|
||
worker:
|
||
deploy:
|
||
replicas: 3 # 根据负载调整
|
||
```
|
||
|
||
3. **负载均衡**:
|
||
- 使用 Nginx 或 Traefik 作为反向代理
|
||
- 配置多个 FastAPI 实例
|
||
|
||
### 扩展建议
|
||
|
||
1. **添加任务优先级**:
|
||
```python
|
||
@celery_app.task(bind=True, priority=10)
|
||
def high_priority_task(self):
|
||
# 高优先级任务
|
||
pass
|
||
```
|
||
|
||
2. **实现任务进度跟踪**:
|
||
```python
|
||
from celery import current_task
|
||
|
||
@celery_app.task(bind=True)
|
||
def task_with_progress(self):
|
||
for i in range(100):
|
||
current_task.update_state(state='PROGRESS', meta={'progress': i})
|
||
# 执行任务逻辑
|
||
```
|
||
|
||
3. **添加任务超时和重试**:
|
||
```python
|
||
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
|
||
def robust_task(self):
|
||
# 任务逻辑
|
||
pass
|
||
```
|
||
|
||
4. **监控集成**:
|
||
- 集成 Prometheus/Grafana 进行监控
|
||
- 使用 Celery Events 进行任务监控
|
||
|
||
## 配置管理
|
||
|
||
Environment variables can be set in a `.env` file:
|
||
|
||
- `REDIS_URL`: Redis connection URL (default: redis://localhost:6379/0)
|
||
- `CELERY_BROKER_URL`: Celery broker URL (default: redis://localhost:6379/0)
|
||
- `CELERY_RESULT_BACKEND`: Celery result backend URL (default: redis://localhost:6379/0)
|
||
|
||
## 开发指南
|
||
|
||
### 添加新任务
|
||
|
||
1. 在 `app/tasks/tasks.py` 中定义新任务:
|
||
```python
|
||
@celery_app.task(bind=True)
|
||
def your_new_task(self, param1, param2):
|
||
log.info(f"Executing your_new_task with {param1}, {param2}")
|
||
# 任务逻辑
|
||
return result
|
||
```
|
||
|
||
2. 在 `app/api/routes.py` 中添加对应的 API 端点:
|
||
```python
|
||
@router.post("/your-task")
|
||
async def submit_your_task(param1: str, param2: int):
|
||
task = your_new_task.delay(param1, param2)
|
||
return {"task_id": task.id, "status": "submitted"}
|
||
```
|
||
|
||
### 代码规范
|
||
|
||
运行代码质量检查:
|
||
```bash
|
||
make lint # 运行 ruff 代码检查
|
||
make format # 格式化代码
|
||
make pre-commit-run # 运行所有 pre-commit 检查
|
||
```
|
||
|
||
代码规范要求:
|
||
- 使用 `uv run ruff check .` 检查代码质量
|
||
- 使用 `uv run ruff format .` 格式化代码
|
||
- 提交前运行 `make pre-commit-run` 确保所有检查通过
|
||
- 为所有新增功能添加适当的日志记录
|
||
|
||
### 测试建议
|
||
|
||
- 使用 pytest 编写单元测试
|
||
- 测试异步任务的提交和结果查询
|
||
- 验证错误处理和边界情况
|
||
- 使用测试 Redis 实例避免影响生产环境
|
||
|
||
## 贡献
|
||
|
||
欢迎提交 Issue 和 Pull Request!在提交前请确保:
|
||
|
||
1. 代码通过所有 linting 和格式化检查
|
||
2. 添加适当的测试用例
|
||
3. 更新相关文档
|
||
4. 遵循现有的代码风格和架构模式
|
||
|
||
## License
|
||
|
||
MIT License |