fastapi_celery/README.md
2025-12-25 16:43:26 +08:00

11 KiB
Raw Blame History

FastAPI Celery Example

一个基于 FastAPI 和 Celery 的异步任务处理示例项目,展示了如何在现代 Python Web 应用中实现高效的异步任务队列。

项目介绍

本项目是一个完整的 FastAPI + Celery 应用示例,演示了如何:

  • 使用 FastAPI 构建高性能的 REST API
  • 集成 Celery 进行异步任务处理
  • 使用 Redis 作为消息代理和结果后端
  • 实现统一的日志管理系统
  • 配置现代化的代码质量工具链
  • 使用容器化技术简化部署

为什么选择这个架构?

  • FastAPI: 现代化的异步 Web 框架,提供出色的性能和自动 API 文档生成
  • Celery: 强大的分布式任务队列,支持多种消息代理
  • Redis: 高速的内存数据库,适合作为任务队列的后端
  • Loguru: 简单易用的日志库,支持结构化日志和文件轮转
  • uv: 快速的 Python 包管理器,比 pip 更快
  • Ruff: 快速的 Python 代码检查和格式化工具
  • Docker: 容器化技术,确保环境一致性和易部署

Features

  • FastAPI web framework
  • Celery for asynchronous tasks
  • Redis as message broker and result backend
  • Loguru for unified logging
  • Docker containerization
  • Code quality with Ruff and pre-commit hooks
  • Dependency management with uv

Project Structure

.
├── app/
│   ├── api/
│   │   ├── __init__.py
│   │   └── routes.py          # API endpoints
│   ├── core/
│   │   ├── __init__.py
│   │   ├── celery_app.py      # Celery configuration
│   │   ├── config.py          # Application settings
│   │   └── logging.py         # Loguru setup
│   └── tasks/
│       ├── __init__.py
│       └── tasks.py           # Celery tasks
├── main.py                    # FastAPI application
├── pyproject.toml             # Project dependencies and ruff config
├── Dockerfile                 # Docker image
├── docker-compose.yml         # Multi-service setup
├── Makefile                   # Build and run commands
├── .pre-commit-config.yaml    # Pre-commit hooks
└── README.md                  # This file

Installation

Prerequisites

Local Development

  1. Clone the repository:
git clone <repository-url>
cd fastapi-celery-example
  1. Install dependencies:
make install
  1. Install pre-commit hooks:
make pre-commit-install

架构说明

系统架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   FastAPI App   │    │     Redis       │    │  Celery Worker  │
│                 │◄──►│  Message Broker │◄──►│                 │
│ • REST API      │    │  Result Backend │    │ • Task Processing│
│ • Task Submission│    │                 │    │ • Async Execution│
└─────────────────┘    └─────────────────┘    └─────────────────┘

工作流程

  1. 客户端请求: 用户通过 FastAPI 端点提交任务
  2. 任务入队: FastAPI 将任务发送到 Redis 消息队列
  3. 异步处理: Celery Worker 从队列中获取任务并异步执行
  4. 结果存储: 任务结果存储回 Redis
  5. 结果查询: 客户端可以通过任务 ID 查询执行结果

主要组件

  • app/api/routes.py: FastAPI 路由定义,处理 HTTP 请求
  • app/core/celery_app.py: Celery 应用配置
  • app/core/logging.py: 统一日志配置
  • app/tasks/tasks.py: 具体的任务实现
  • app/core/config.py: 应用配置管理

用法指南

Local Development

  1. Start Redis (required for Celery):
docker run -d -p 6379:6379 redis:7-alpine
  1. Run the FastAPI server:
make dev
  1. In another terminal, run the Celery worker:
make worker
  1. Visit http://localhost:8000 to access the API

Docker Deployment

  1. Build and start all services:
make up
  1. The API will be available at http://localhost:8000

  2. Stop services:

make down

API 端点详解

基础端点

  • GET / - 应用根端点,返回欢迎信息

任务提交端点

  • POST /add

    • 功能: 提交两个整数的加法任务
    • 请求体: {"x": int, "y": int}
    • 响应: {"task_id": "uuid", "status": "submitted", "message": "..."}
  • POST /long-task

    • 功能: 提交长时间运行的任务(用于演示异步处理)
    • 请求体: {"duration": int} (秒数)
    • 响应: {"task_id": "uuid", "status": "submitted", "message": "..."}

结果查询端点

  • GET /result/{task_id}
    • 功能: 查询任务执行结果
    • 响应状态:
      • pending: 任务正在执行
      • success: 任务成功完成,返回结果
      • failure: 任务执行失败,返回错误信息

详细使用示例

# Submit an addition task
curl -X POST "http://localhost:8000/add" \
     -H "Content-Type: application/json" \
     -d '{"x": 5, "y": 3}'

# Response: {"task_id": "uuid", "status": "submitted", "message": "Addition task submitted: 5 + 3"}

# Check result
curl "http://localhost:8000/result/uuid"

# Response: {"task_id": "uuid", "status": "success", "result": 8}

完整工作流程示例

# 1. 提交加法任务
curl -X POST "http://localhost:8000/add" \
     -H "Content-Type: application/json" \
     -d '{"x": 10, "y": 20}'

# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "submitted", "message": "Addition task submitted: 10 + 20"}

# 2. 立即查询结果(可能还在处理中)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "pending"}

# 3. 稍后再次查询(任务完成)
curl "http://localhost:8000/result/550e8400-e29b-41d4-a716-446655440000"
# 返回: {"task_id": "550e8400-e29b-41d4-a716-446655440000", "status": "success", "result": 30}

# 4. 提交长时间任务
curl -X POST "http://localhost:8000/long-task" \
     -H "Content-Type: application/json" \
     -d '{"duration": 5}'

# 5. 在另一个终端查看 Celery worker 日志
# Worker 将处理任务并记录执行过程

错误处理

当任务执行失败时,结果查询会返回:

{
  "task_id": "uuid",
  "status": "failure",
  "error": "具体错误信息"
}

任务状态说明

  • PENDING: 任务已提交,正在等待处理
  • PROGRESS: 任务正在执行中(如果任务支持进度报告)
  • SUCCESS: 任务成功完成,结果可用
  • FAILURE: 任务执行失败,包含错误信息
  • RETRY: 任务失败后正在重试(如果配置了重试机制)

监控和调试

日志查看

应用会生成两种日志输出:

  1. 控制台日志: 实时显示应用运行状态
  2. 文件日志: 保存到 app.log 文件,支持自动轮转
# 查看实时日志
tail -f app.log

# 查看 FastAPI 日志
make dev

# 查看 Celery worker 日志(新终端)
make worker

健康检查

# 检查 API 健康状态
curl http://localhost:8000/

# 检查 Redis 连接
docker exec -it <redis-container> redis-cli ping

常见问题排查

  1. 任务一直处于 PENDING 状态

    • 检查 Celery worker 是否正在运行
    • 确认 Redis 服务是否可访问
  2. 连接 Redis 失败

    • 确保 Redis 容器正在运行
    • 检查网络配置和端口映射
  3. 任务执行失败

    • 查看 Celery worker 的错误日志
    • 检查任务代码中的异常处理

性能监控

  • 使用 /result/{task_id} 端点监控任务执行时间
  • 查看 app.log 中的任务处理日志
  • 监控 Redis 的内存和连接数

日志系统

All logs are written to both console and app.log file with rotation.

部署和扩展

生产环境部署

  1. 使用 Docker Compose:

    make up
    
  2. 水平扩展 Worker:

    # 在 docker-compose.yml 中增加 worker 实例
    worker:
      deploy:
        replicas: 3  # 根据负载调整
    
  3. 负载均衡:

    • 使用 Nginx 或 Traefik 作为反向代理
    • 配置多个 FastAPI 实例

扩展建议

  1. 添加任务优先级:

    @celery_app.task(bind=True, priority=10)
    def high_priority_task(self):
        # 高优先级任务
        pass
    
  2. 实现任务进度跟踪:

    from celery import current_task
    
    @celery_app.task(bind=True)
    def task_with_progress(self):
        for i in range(100):
            current_task.update_state(state='PROGRESS', meta={'progress': i})
            # 执行任务逻辑
    
  3. 添加任务超时和重试:

    @celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
    def robust_task(self):
        # 任务逻辑
        pass
    
  4. 监控集成:

    • 集成 Prometheus/Grafana 进行监控
    • 使用 Celery Events 进行任务监控

配置管理

Environment variables can be set in a .env file:

  • REDIS_URL: Redis connection URL (default: redis://localhost:6379/0)
  • CELERY_BROKER_URL: Celery broker URL (default: redis://localhost:6379/0)
  • CELERY_RESULT_BACKEND: Celery result backend URL (default: redis://localhost:6379/0)

开发指南

添加新任务

  1. app/tasks/tasks.py 中定义新任务:

    @celery_app.task(bind=True)
    def your_new_task(self, param1, param2):
        log.info(f"Executing your_new_task with {param1}, {param2}")
        # 任务逻辑
        return result
    
  2. app/api/routes.py 中添加对应的 API 端点:

    @router.post("/your-task")
    async def submit_your_task(param1: str, param2: int):
        task = your_new_task.delay(param1, param2)
        return {"task_id": task.id, "status": "submitted"}
    

代码规范

运行代码质量检查:

make lint      # 运行 ruff 代码检查
make format    # 格式化代码
make pre-commit-run  # 运行所有 pre-commit 检查

代码规范要求:

  • 使用 uv run ruff check . 检查代码质量
  • 使用 uv run ruff format . 格式化代码
  • 提交前运行 make pre-commit-run 确保所有检查通过
  • 为所有新增功能添加适当的日志记录

测试建议

  • 使用 pytest 编写单元测试
  • 测试异步任务的提交和结果查询
  • 验证错误处理和边界情况
  • 使用测试 Redis 实例避免影响生产环境

贡献

欢迎提交 Issue 和 Pull Request在提交前请确保

  1. 代码通过所有 linting 和格式化检查
  2. 添加适当的测试用例
  3. 更新相关文档
  4. 遵循现有的代码风格和架构模式

License

MIT License