为什么有FastStream

在处理消息队列(Kafka、RabbitMQ、Redis)时,你是否厌倦了重复编写底层连接、订阅、序列化和消息异常处理的代码?比如说之前可能用过的kafka-pythonpikaredis-py之类的库,虽然为业务的开发免去了重复造轮子,也具备了一定的灵活性,但我还是觉得缺乏现代开发所追求的开发效率和优雅性。恰巧最近在Github中无意看到了这个与消息处理相关的库-FastStream,在阅读完README的内容后,一时兴趣被调动了,于是花了几个小时深入文档并上手体验感受了一下,下面就来详细介绍一下。

FastStream是什么?

FastStream是一个用于构建与消息代理(如 Apache Kafka、RabbitMQ、NATS 和 Redis)交互的服务框架,让构建事件驱动的微服务变得轻而易举。它提供了统一的API,让开发者可以在不重写应用程序逻辑的情况切换消息组件,实现了开发者只需聚焦于业务逻辑的开发。它的设计哲学是:通过简单的装饰器,将函数变成强大的消息处理器。

其主要特点有:

  • 统一直观的API: 一次编写,随处运行,使用装饰器@broker.subscriber@broker.publisher 声明消息处理器。
  • 自动文档:自动生成AsyncAPI文档,使团队之间的集成无缝衔接。
  • 类型安全与自动解析:依托Pydantic,自动验证、序列化和解析消息体。
  • 强大的异步支持:原生支持async/await ,轻松处理高并发IO场景。
  • 开箱即用:直观的装饰器、内置依赖注入、测试工具和CLI工具等,使开发变得简单且快速。

核心特性

多消息代理支持

代理 说明 安装
Kafka 高吞吐量事件流 pip install faststream[kafka]
RabbitMQ 可靠消息队列 pip install faststream[rabbit]
NATS 轻量级消息传递 pip install faststream[nats]
Redis 简单发布/订阅 pip install faststream[redis]
1
2
3
4
5
6
7
8
9
10
# 只需更改导入和代理初始化
from faststream.kafka import KafkaBroker
# from faststream.rabbit import RabbitBroker
# from faststream.nats import NatsBroker
# from faststream.redis import RedisBroker

broker = KafkaBroker("localhost:9092")
# broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222/")
# broker = RedisBroker("redis://localhost:6379/")

强大的装饰器

  • @broker.subscriber(): 标记函数从队列/主题消费消息
  • @broker.publisher(): 标记函数以发布消息(通常与订阅者一起使用)

上述两个装饰器将普通的Python函数转换为消息处理器,自动处理消息的序列化、反序列化和路由。

1
2
3
4
5
@broker.subscriber("input-queue")
@broker.publisher("output-queue")
async def process_message(data: dict) -> dict:
# 您的业务逻辑在这里
return {"processed": True, "data": data}

Pydantic集成

了解FastAPI项目的人一定都知道Pydantic库,FastStream利用Pydantic进行消息验证和序列化。由于Pydantic是个独立且很热门的库,因此在此就不展开对Pydantic的使用,如需详细了解,请:点击前往

内置测试

测试分布式系统可能具有一定的挑战性,但使用FastStream通过内置测试工具使其变得简单。
内置TestBroker类:将消息处理器重定向到内存处理,让你无需运行实际代理即可进行测试逻辑。

1
2
3
4
5
6
7
8
import pytest
from faststream.rabbit import TestRabbitBroker

@pytest.mark.asyncio
async def test_my_handler():
async with TestRabbitBroker(broker) as test_broker:
await test_broker.publish({"test": "data"}, "input-queue")
# 您的断言在这里

依赖注入(DI)

FastStream包含了一个强大的依赖注入系统,用法和FastAPI中的Depends类似。

可以轻松的管理数据库连接、配置等资源,使得代码更易测试和维护。

1
2
3
4
5
6
7
8
9
from faststream import Depends

def get_db_connection():
# 模拟获取数据库连接
return "DatabaseConnection"

@broker.subscriber("order_topic")
async def process_order(event: dict, db: str = Depends(get_db_connection)):
print(f"使用 {db} 处理订单: {event}")

代码实践

10行代码实现一个Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 安装: pip install faststream-kafka
from faststream import FastStream
from faststream.kafka import KafkaBroker
from pydantic import BaseModel

# 定义数据模型
class UserCreated(BaseModel):
user_id: int
email: str

# 创建应用与Broker
broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

# 核心:用装饰器定义处理器
@broker.subscriber("user_created_topic")
async def handle_user_created_event(event: UserCreated): # 类型自动推断!
print(f"收到新用户注册事件!ID: {event.user_id}, 邮箱: {event.email}")
# 此处可写入你的业务逻辑,如发邮件、写数据库等
return {"status": "success"}

if __name__ == "__main__":
app.run()

实现用户注册与发送邮件

需求:构建一个用户注册服务,处理新用户注册并发送欢迎电子邮件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from faststream import FastStream, Logger
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel, Field, EmailStr

# 设置代理和应用程序
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

# 定义我们的消息结构
class UserRegistration(BaseModel):
username: str = Field(..., min_length=3)
email: EmailStr
age: int = Field(..., ge=18)

# 处理新用户注册
@broker.subscriber("user.registrations")
@broker.publisher("user.welcome_emails")
async def handle_registration(
user: UserRegistration,
logger: Logger
) -> dict:
logger.info(f"New user registered: {user.username}")

# 业务逻辑在这里
welcome_message = f"Welcome {user.username}! Thanks for joining us."

return {
"email": user.email,
"message": welcome_message
}

# 发送欢迎电子邮件
@broker.subscriber("user.welcome_emails")
async def send_welcome_email(email_data: dict, logger: Logger):
logger.info(f"Sending welcome email to {email_data['email']}")
# 电子邮件发送逻辑将在这里

总结

FastStream非常适合应用于:事件驱动架构(EDA)中的微服务、实时数据ETL管道、需要处理Kafka、RabbitMQ消息等场景中,它以简洁的语法(装饰器),严谨的类型系统(Pydantic),自动化的工具链以及广泛的生态融合(FastAPI),为Python在流式处理、异步消息处理等场景中注入了新活力。


本站由 BluesSen 使用 Stellar 1.33.1 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

本站总访问量