fastapi_celery/README.md
2025-12-25 16:43:26 +08:00

411 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# FastAPI Celery Example
一个基于 FastAPI 和 Celery 的异步任务处理示例项目,展示了如何在现代 Python Web 应用中实现高效的异步任务队列。
## 项目介绍
本项目是一个完整的 FastAPI + Celery 应用示例,演示了如何:
- 使用 FastAPI 构建高性能的 REST API
- 集成 Celery 进行异步任务处理
- 使用 Redis 作为消息代理和结果后端
- 实现统一的日志管理系统
- 配置现代化的代码质量工具链
- 使用容器化技术简化部署
### 为什么选择这个架构?
- **FastAPI**: 现代化的异步 Web 框架,提供出色的性能和自动 API 文档生成
- **Celery**: 强大的分布式任务队列,支持多种消息代理
- **Redis**: 高速的内存数据库,适合作为任务队列的后端
- **Loguru**: 简单易用的日志库,支持结构化日志和文件轮转
- **uv**: 快速的 Python 包管理器,比 pip 更快
- **Ruff**: 快速的 Python 代码检查和格式化工具
- **Docker**: 容器化技术,确保环境一致性和易部署
## Features
- FastAPI web framework
- Celery for asynchronous tasks
- Redis as message broker and result backend
- Loguru for unified logging
- Docker containerization
- Code quality with Ruff and pre-commit hooks
- Dependency management with uv
## Project Structure
```
.
├── app/
│ ├── api/
│ │ ├── __init__.py
│ │ └── routes.py # API endpoints
│ ├── core/
│ │ ├── __init__.py
│ │ ├── celery_app.py # Celery configuration
│ │ ├── config.py # Application settings
│ │ └── logging.py # Loguru setup
│ └── tasks/
│ ├── __init__.py
│ └── tasks.py # Celery tasks
├── main.py # FastAPI application
├── pyproject.toml # Project dependencies and ruff config
├── Dockerfile # Docker image
├── docker-compose.yml # Multi-service setup
├── Makefile # Build and run commands
├── .pre-commit-config.yaml # Pre-commit hooks
└── README.md # This file
```
## Installation
### Prerequisites
- Python 3.11+
- uv (https://github.com/astral-sh/uv)
- Docker and Docker Compose (for containerized deployment)
### Local Development
1. Clone the repository:
```bash
git clone <repository-url>
cd fastapi-celery-example
```
2. Install dependencies:
```bash
make install
```
3. Install pre-commit hooks:
```bash
make pre-commit-install
```
## 架构说明
### 系统架构
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ FastAPI App │ │ Redis │ │ Celery Worker │
│ │◄──►│ Message Broker │◄──►│ │
│ • REST API │ │ Result Backend │ │ • Task Processing│
│ • Task Submission│ │ │ │ • Async Execution│
└─────────────────┘ └─────────────────┘ └─────────────────┘
```
### 工作流程
1. **客户端请求**: 用户通过 FastAPI 端点提交任务
2. **任务入队**: FastAPI 将任务发送到 Redis 消息队列
3. **异步处理**: Celery Worker 从队列中获取任务并异步执行
4. **结果存储**: 任务结果存储回 Redis
5. **结果查询**: 客户端可以通过任务 ID 查询执行结果
### 主要组件
- **app/api/routes.py**: FastAPI 路由定义,处理 HTTP 请求
- **app/core/celery_app.py**: Celery 应用配置
- **app/core/logging.py**: 统一日志配置
- **app/tasks/tasks.py**: 具体的任务实现
- **app/core/config.py**: 应用配置管理
## 用法指南
### Local Development
1. Start Redis (required for Celery):
```bash
docker run -d -p 6379:6379 redis:7-alpine
```
2. Run the FastAPI server:
```bash
make dev
```
3. In another terminal, run the Celery worker:
```bash
make worker
```
4. Visit http://localhost:8000 to access the API
### Docker Deployment
1. Build and start all services:
```bash
make up
```
2. The API will be available at http://localhost:8000
3. Stop services:
```bash
make down
```
## API 端点详解
### 基础端点
- `GET /` - 应用根端点,返回欢迎信息
### 任务提交端点
- `POST /add`
- **功能**: 提交两个整数的加法任务
- **请求体**: `{"x": int, "y": int}`
- **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}`
- `POST /long-task`
- **功能**: 提交长时间运行的任务(用于演示异步处理)
- **请求体**: `{"duration": int}` (秒数)
- **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}`
### 结果查询端点
- `GET /result/{task_id}`
- **功能**: 查询任务执行结果
- **响应状态**:
- `pending`: 任务正在执行
- `success`: 任务成功完成,返回结果
- `failure`: 任务执行失败,返回错误信息
## 详细使用示例
```bash
# Submit an addition task
curl -X POST "http://localhost:8000/add" \
-H "Content-Type: application/json" \
-d '{"x": 5, "y": 3}'
# Response: {"task_id": "uuid", "status": "submitted", "message": "Addition task submitted: 5 + 3"}
# Check result
curl "http://localhost:8000/result/uuid"
# Response: {"task_id": "uuid", "status": "success", "result": 8}
```
### 完整工作流程示例
```bash
# 1. 提交加法任务
curl -X POST "http://localhost:8000/add" \
-H "Content-Type: application/json" \
-d '{"x": 10, "y": 20}'
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "submitted", "message": "Addition task submitted: 10 + 20"}
# 2. 立即查询结果(可能还在处理中)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending"}
# 3. 稍后再次查询(任务完成)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "success", "result": 30}
# 4. 提交长时间任务
curl -X POST "http://localhost:8000/long-task" \
-H "Content-Type: application/json" \
-d '{"duration": 5}'
# 5. 在另一个终端查看 Celery worker 日志
# Worker 将处理任务并记录执行过程
```
### 错误处理
当任务执行失败时,结果查询会返回:
```json
{
"task_id": "uuid",
"status": "failure",
"error": "具体错误信息"
}
```
### 任务状态说明
- **PENDING**: 任务已提交,正在等待处理
- **PROGRESS**: 任务正在执行中(如果任务支持进度报告)
- **SUCCESS**: 任务成功完成,结果可用
- **FAILURE**: 任务执行失败,包含错误信息
- **RETRY**: 任务失败后正在重试(如果配置了重试机制)
## 监控和调试
### 日志查看
应用会生成两种日志输出:
1. **控制台日志**: 实时显示应用运行状态
2. **文件日志**: 保存到 `app.log` 文件,支持自动轮转
```bash
# 查看实时日志
tail -f app.log
# 查看 FastAPI 日志
make dev
# 查看 Celery worker 日志(新终端)
make worker
```
### 健康检查
```bash
# 检查 API 健康状态
curl http://localhost:8000/
# 检查 Redis 连接
docker exec -it <redis-container> redis-cli ping
```
### 常见问题排查
1. **任务一直处于 PENDING 状态**
- 检查 Celery worker 是否正在运行
- 确认 Redis 服务是否可访问
2. **连接 Redis 失败**
- 确保 Redis 容器正在运行
- 检查网络配置和端口映射
3. **任务执行失败**
- 查看 Celery worker 的错误日志
- 检查任务代码中的异常处理
### 性能监控
- 使用 `/result/{task_id}` 端点监控任务执行时间
- 查看 `app.log` 中的任务处理日志
- 监控 Redis 的内存和连接数
## 日志系统
All logs are written to both console and `app.log` file with rotation.
## 部署和扩展
### 生产环境部署
1. **使用 Docker Compose**:
```bash
make up
```
2. **水平扩展 Worker**:
```yaml
# 在 docker-compose.yml 中增加 worker 实例
worker:
deploy:
replicas: 3 # 根据负载调整
```
3. **负载均衡**:
- 使用 Nginx 或 Traefik 作为反向代理
- 配置多个 FastAPI 实例
### 扩展建议
1. **添加任务优先级**:
```python
@celery_app.task(bind=True, priority=10)
def high_priority_task(self):
# 高优先级任务
pass
```
2. **实现任务进度跟踪**:
```python
from celery import current_task
@celery_app.task(bind=True)
def task_with_progress(self):
for i in range(100):
current_task.update_state(state='PROGRESS', meta={'progress': i})
# 执行任务逻辑
```
3. **添加任务超时和重试**:
```python
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def robust_task(self):
# 任务逻辑
pass
```
4. **监控集成**:
- 集成 Prometheus/Grafana 进行监控
- 使用 Celery Events 进行任务监控
## 配置管理
Environment variables can be set in a `.env` file:
- `REDIS_URL`: Redis connection URL (default: redis://localhost:6379/0)
- `CELERY_BROKER_URL`: Celery broker URL (default: redis://localhost:6379/0)
- `CELERY_RESULT_BACKEND`: Celery result backend URL (default: redis://localhost:6379/0)
## 开发指南
### 添加新任务
1. 在 `app/tasks/tasks.py` 中定义新任务:
```python
@celery_app.task(bind=True)
def your_new_task(self, param1, param2):
log.info(f"Executing your_new_task with {param1}, {param2}")
# 任务逻辑
return result
```
2. 在 `app/api/routes.py` 中添加对应的 API 端点:
```python
@router.post("/your-task")
async def submit_your_task(param1: str, param2: int):
task = your_new_task.delay(param1, param2)
return {"task_id": task.id, "status": "submitted"}
```
### 代码规范
运行代码质量检查:
```bash
make lint # 运行 ruff 代码检查
make format # 格式化代码
make pre-commit-run # 运行所有 pre-commit 检查
```
代码规范要求:
- 使用 `uv run ruff check .` 检查代码质量
- 使用 `uv run ruff format .` 格式化代码
- 提交前运行 `make pre-commit-run` 确保所有检查通过
- 为所有新增功能添加适当的日志记录
### 测试建议
- 使用 pytest 编写单元测试
- 测试异步任务的提交和结果查询
- 验证错误处理和边界情况
- 使用测试 Redis 实例避免影响生产环境
## 贡献
欢迎提交 Issue 和 Pull Request在提交前请确保
1. 代码通过所有 linting 和格式化检查
2. 添加适当的测试用例
3. 更新相关文档
4. 遵循现有的代码风格和架构模式
## License
MIT License