# FastAPI Celery Example 一个基于 FastAPI 和 Celery 的异步任务处理示例项目,展示了如何在现代 Python Web 应用中实现高效的异步任务队列。 ## 项目介绍 本项目是一个完整的 FastAPI + Celery 应用示例,演示了如何: - 使用 FastAPI 构建高性能的 REST API - 集成 Celery 进行异步任务处理 - 使用 Redis 作为消息代理和结果后端 - 实现统一的日志管理系统 - 配置现代化的代码质量工具链 - 使用容器化技术简化部署 ### 为什么选择这个架构? - **FastAPI**: 现代化的异步 Web 框架,提供出色的性能和自动 API 文档生成 - **Celery**: 强大的分布式任务队列,支持多种消息代理 - **Redis**: 高速的内存数据库,适合作为任务队列的后端 - **Loguru**: 简单易用的日志库,支持结构化日志和文件轮转 - **uv**: 快速的 Python 包管理器,比 pip 更快 - **Ruff**: 快速的 Python 代码检查和格式化工具 - **Docker**: 容器化技术,确保环境一致性和易部署 ## Features - FastAPI web framework - Celery for asynchronous tasks - Redis as message broker and result backend - Loguru for unified logging - Docker containerization - Code quality with Ruff and pre-commit hooks - Dependency management with uv ## Project Structure ``` . ├── app/ │ ├── api/ │ │ ├── __init__.py │ │ └── routes.py # API endpoints │ ├── core/ │ │ ├── __init__.py │ │ ├── celery_app.py # Celery configuration │ │ ├── config.py # Application settings │ │ └── logging.py # Loguru setup │ └── tasks/ │ ├── __init__.py │ └── tasks.py # Celery tasks ├── main.py # FastAPI application ├── pyproject.toml # Project dependencies and ruff config ├── Dockerfile # Docker image ├── docker-compose.yml # Multi-service setup ├── Makefile # Build and run commands ├── .pre-commit-config.yaml # Pre-commit hooks └── README.md # This file ``` ## Installation ### Prerequisites - Python 3.11+ - uv (https://github.com/astral-sh/uv) - Docker and Docker Compose (for containerized deployment) ### Local Development 1. Clone the repository: ```bash git clone cd fastapi-celery-example ``` 2. Install dependencies: ```bash make install ``` 3. Install pre-commit hooks: ```bash make pre-commit-install ``` ## 架构说明 ### 系统架构 ``` ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ FastAPI App │ │ Redis │ │ Celery Worker │ │ │◄──►│ Message Broker │◄──►│ │ │ • REST API │ │ Result Backend │ │ • Task Processing│ │ • Task Submission│ │ │ │ • Async Execution│ └─────────────────┘ └─────────────────┘ └─────────────────┘ ``` ### 工作流程 1. **客户端请求**: 用户通过 FastAPI 端点提交任务 2. **任务入队**: FastAPI 将任务发送到 Redis 消息队列 3. **异步处理**: Celery Worker 从队列中获取任务并异步执行 4. **结果存储**: 任务结果存储回 Redis 5. **结果查询**: 客户端可以通过任务 ID 查询执行结果 ### 主要组件 - **app/api/routes.py**: FastAPI 路由定义,处理 HTTP 请求 - **app/core/celery_app.py**: Celery 应用配置 - **app/core/logging.py**: 统一日志配置 - **app/tasks/tasks.py**: 具体的任务实现 - **app/core/config.py**: 应用配置管理 ## 用法指南 ### Local Development 1. Start Redis (required for Celery): ```bash docker run -d -p 6379:6379 redis:7-alpine ``` 2. Run the FastAPI server: ```bash make dev ``` 3. In another terminal, run the Celery worker: ```bash make worker ``` 4. Visit http://localhost:8000 to access the API ### Docker Deployment 1. Build and start all services: ```bash make up ``` 2. The API will be available at http://localhost:8000 3. Stop services: ```bash make down ``` ## API 端点详解 ### 基础端点 - `GET /` - 应用根端点,返回欢迎信息 ### 任务提交端点 - `POST /add` - **功能**: 提交两个整数的加法任务 - **请求体**: `{"x": int, "y": int}` - **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}` - `POST /long-task` - **功能**: 提交长时间运行的任务(用于演示异步处理) - **请求体**: `{"duration": int}` (秒数) - **响应**: `{"task_id": "uuid", "status": "submitted", "message": "..."}` ### 结果查询端点 - `GET /result/{task_id}` - **功能**: 查询任务执行结果 - **响应状态**: - `pending`: 任务正在执行 - `success`: 任务成功完成,返回结果 - `failure`: 任务执行失败,返回错误信息 ## 详细使用示例 ```bash # Submit an addition task curl -X POST "http://localhost:8000/add" \ -H "Content-Type: application/json" \ -d '{"x": 5, "y": 3}' # Response: {"task_id": "uuid", "status": "submitted", "message": "Addition task submitted: 5 + 3"} # Check result curl "http://localhost:8000/result/uuid" # Response: {"task_id": "uuid", "status": "success", "result": 8} ``` ### 完整工作流程示例 ```bash # 1. 提交加法任务 curl -X POST "http://localhost:8000/add" \ -H "Content-Type: application/json" \ -d '{"x": 10, "y": 20}' # 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "submitted", "message": "Addition task submitted: 10 + 20"} # 2. 立即查询结果(可能还在处理中) curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000" # 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending"} # 3. 稍后再次查询(任务完成) curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000" # 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "success", "result": 30} # 4. 提交长时间任务 curl -X POST "http://localhost:8000/long-task" \ -H "Content-Type: application/json" \ -d '{"duration": 5}' # 5. 在另一个终端查看 Celery worker 日志 # Worker 将处理任务并记录执行过程 ``` ### 错误处理 当任务执行失败时,结果查询会返回: ```json { "task_id": "uuid", "status": "failure", "error": "具体错误信息" } ``` ### 任务状态说明 - **PENDING**: 任务已提交,正在等待处理 - **PROGRESS**: 任务正在执行中(如果任务支持进度报告) - **SUCCESS**: 任务成功完成,结果可用 - **FAILURE**: 任务执行失败,包含错误信息 - **RETRY**: 任务失败后正在重试(如果配置了重试机制) ## 监控和调试 ### 日志查看 应用会生成两种日志输出: 1. **控制台日志**: 实时显示应用运行状态 2. **文件日志**: 保存到 `app.log` 文件,支持自动轮转 ```bash # 查看实时日志 tail -f app.log # 查看 FastAPI 日志 make dev # 查看 Celery worker 日志(新终端) make worker ``` ### 健康检查 ```bash # 检查 API 健康状态 curl http://localhost:8000/ # 检查 Redis 连接 docker exec -it redis-cli ping ``` ### 常见问题排查 1. **任务一直处于 PENDING 状态** - 检查 Celery worker 是否正在运行 - 确认 Redis 服务是否可访问 2. **连接 Redis 失败** - 确保 Redis 容器正在运行 - 检查网络配置和端口映射 3. **任务执行失败** - 查看 Celery worker 的错误日志 - 检查任务代码中的异常处理 ### 性能监控 - 使用 `/result/{task_id}` 端点监控任务执行时间 - 查看 `app.log` 中的任务处理日志 - 监控 Redis 的内存和连接数 ## 日志系统 All logs are written to both console and `app.log` file with rotation. ## 部署和扩展 ### 生产环境部署 1. **使用 Docker Compose**: ```bash make up ``` 2. **水平扩展 Worker**: ```yaml # 在 docker-compose.yml 中增加 worker 实例 worker: deploy: replicas: 3 # 根据负载调整 ``` 3. **负载均衡**: - 使用 Nginx 或 Traefik 作为反向代理 - 配置多个 FastAPI 实例 ### 扩展建议 1. **添加任务优先级**: ```python @celery_app.task(bind=True, priority=10) def high_priority_task(self): # 高优先级任务 pass ``` 2. **实现任务进度跟踪**: ```python from celery import current_task @celery_app.task(bind=True) def task_with_progress(self): for i in range(100): current_task.update_state(state='PROGRESS', meta={'progress': i}) # 执行任务逻辑 ``` 3. **添加任务超时和重试**: ```python @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3}) def robust_task(self): # 任务逻辑 pass ``` 4. **监控集成**: - 集成 Prometheus/Grafana 进行监控 - 使用 Celery Events 进行任务监控 ## 配置管理 Environment variables can be set in a `.env` file: - `REDIS_URL`: Redis connection URL (default: redis://localhost:6379/0) - `CELERY_BROKER_URL`: Celery broker URL (default: redis://localhost:6379/0) - `CELERY_RESULT_BACKEND`: Celery result backend URL (default: redis://localhost:6379/0) ## 开发指南 ### 添加新任务 1. 在 `app/tasks/tasks.py` 中定义新任务: ```python @celery_app.task(bind=True) def your_new_task(self, param1, param2): log.info(f"Executing your_new_task with {param1}, {param2}") # 任务逻辑 return result ``` 2. 在 `app/api/routes.py` 中添加对应的 API 端点: ```python @router.post("/your-task") async def submit_your_task(param1: str, param2: int): task = your_new_task.delay(param1, param2) return {"task_id": task.id, "status": "submitted"} ``` ### 代码规范 运行代码质量检查: ```bash make lint # 运行 ruff 代码检查 make format # 格式化代码 make pre-commit-run # 运行所有 pre-commit 检查 ``` 代码规范要求: - 使用 `uv run ruff check .` 检查代码质量 - 使用 `uv run ruff format .` 格式化代码 - 提交前运行 `make pre-commit-run` 确保所有检查通过 - 为所有新增功能添加适当的日志记录 ### 测试建议 - 使用 pytest 编写单元测试 - 测试异步任务的提交和结果查询 - 验证错误处理和边界情况 - 使用测试 Redis 实例避免影响生产环境 ## 贡献 欢迎提交 Issue 和 Pull Request!在提交前请确保: 1. 代码通过所有 linting 和格式化检查 2. 添加适当的测试用例 3. 更新相关文档 4. 遵循现有的代码风格和架构模式 ## License MIT License