Apache Pulsar 是灵活的发布-订阅消息系统(Flexible Pub/Sub messaging),采用计算与存储分离的架构。雅虎在 2013 年开始开发 Pulsar ,于 2016 年首次开源,目前是 Apache 软件基金会的顶级项目。Pulsar 具有支持多租户、持久化存储、多机房跨区域数据复制、高吞吐、低延迟等特性。

Pulsar 组件

Pulsar 集群主要由以下三部分组成:

  • Broker:Pulsar 的 broker 是一个无状态组件,本身不存储数据。主要负责处理 producer 和 consumer 的请求,消息的复制与分发,数据的计算。
  • Zookeeper:主要用于存储元数据、集群配置,任务的协调(例如哪个 broker 负责哪个 topic),服务的发现(例如 broker 发现 bookie 的地址)。
  • Bookeeper:主要用于数据的持久化存储。除了消息数据,cursors 也会被持久化到 Bookeeper,cursors 是消费端订阅消费的位移。Bookeeper 中每一个存储节点叫做 bookie。

concepts-architecture-overview

层级存储

层级存储的优势:

  • Infinite Stream: 以流的方式永久保存原始数据
  • 分区的容量不再受限
  • 充分利⽤云存储或现有的廉价存储 ( 例如 HDFS)
  • 数据统⼀表征:客户端无需关⼼数据究竟存储在哪⾥

Pulsar Functions

Pulsar Functions 是一个轻量级的计算框架,可以给用户提供一个部署简单、运维简单、API 简单的 FASS(Function as a service)平台。Pulsar Functions 提供基于事件的服务,支持有状态与无状态的多语言计算,是对复杂的大数据处理框架的有力补充。

pulsar functions

Pulsar IO

Pulsar IO 分为输入(Input)和输出(Output)两个模块,输入代表数据从哪里来,通过 Source 实现数据输入。输出代表数据要往哪里去,通过 Sink 实现数据输出。

plusar sink

Pulsar 基本概念

Producer & Consumer

身为⼀个 Pub/Sub 系统,⾸先的存在要素必然是 producer(⽣产者)。producer 发送数据给 Pulsar,将消息以 append 的形式追加到 topic 中。发送的数据是 key/value 形式的,并且数据会上 schema 的信息。Pulsar 会确保⼀个 producer 往 topic 发送的消息满⾜⼀定的 schema 格式。

Subscription

Pulsar ⾥将 consumer 接收消息的过程称之为:subscription(订阅),类似于 Kafka 的 consumer group(消费组)。⼀个订阅⾥的所有 consumer,会作为⼀个整体去消费这个 topic ⾥的所有消息。Pulsar 有四种订阅模式:独占(exclusive)、故障转移(failover)、共享(shared)、共享键(key_shared)。

Exclusive

在 exclusive 模式下,一个 subscription 只允许被一个 consumer 用于订阅 topic ,如果多个 consumer 使用相同的 subscription 去订阅同一个 topic,则会发生错误。exclusive 是默认的订阅模式。如下图所示,Consumer A-0 和 Consumer A-1 都使用了相同的 subscription(相同的消费组),只有 Consumer A-0 被允许消费消息。

Failover

在 failover 模式下,多个 consumer 允许使用同一个 subscription 去订阅 topic。但是对于给定的 topic,broker 将选择⼀个 consumer 作为该 topic 的主 consumer ,其他 consumer 将被指定为故障转移 consumer 。当主 consumer 失去连接时,topic 将被重新分配给其中⼀个故障转移 consumer ,⽽新分配的 consumer 将成为新的主 consumer 。发⽣这种情况时,所有未确认的消息都将传递给新的主 consumer ,这个过程类似于 Kafka 中的 consumer 组重平衡(rebalance)。

Shared

在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息以轮询的方式分发给 consumer ,并且每条消费仅发送给一个 consumer 。当有 consumer 失去连接时,所有发送给该 consumer 但未被确认的消息将被重新安排,以便发送给该 subscription 上剩余的 consumer 。

shared 模式有以下限制:

  • 消息不能保证有序。
  • 不支持批量 ack。

Key_Shared

key_shared 是 Pulsar 2.4.0 以后⼀个新订阅模式。在 shared 模式下,多个 consumer 可以使用同一个 subscription 去订阅 topic。消息按照 key 分发给 consumer ,含有相同 key 的消息只被发送给同一个 consumer 。

key_shared 模式有以下限制:

  • 需要为每条消息指定一个 key 或者 orderingKey。
  • 不支持批量 ack。
  • producer 应该禁用 batch 或者使用基于 key 的 batch。

存储模型

  • 第一层抽象是 topic(partition),topic 是一个逻辑的概念,topic 是消息的集合,所有⽣产者的消息,都会归属到指定的 topic ⾥。所有在 topic ⾥的消息,会按照⼀定的规则,被切分成不同的分区(partition)。在 Kafka 中 partition 是真正的物理单元,但是在 Pulsar 中 partition 仍然是一个逻辑的概念。
  • Pulsar 把 partition 进一步分成多个分片(segment),segment 是 Pulsar 中真正的物理单元,Pulsar 中的数据是持久化在 Bookeeper 中的,segment 其实对应的就是 Bookeeper 中的 ledger。
  • 在分片中存储了更小粒度的 entry,entry 存储的是一条或者一个 batch 的消息,batch 是一次性批量提交多条消息。

Ack 机制

在 Pulsar 中支持了两种 ack 的机制,分别是单条 ack 和批量 ack。单条 ack(AckIndividual)是指 consumer 可以根据消息的 messageID 来针对某一个特定的消息进行 ack 操作;批量 ack(AckCumulative)是指一次 ack 多条消息。

消息生命周期

默认情况下,Pulsar Broker 会对消息做如下处理:

  • 当消息被 consumer 确认之后,会立即执行删除操作。
  • 对于未被确认的消息会存储到 backlog 中。

但是,很多线上的生产环境下,这种默认行为并不能满足我们的生产需求,所以,Pulsar 提供了如下配置策略来覆盖这些行为:

  • Retention 策略:用户可以将 consumer 已经确认的消息保留下来。
  • TTL 策略:对于未确认的消息,用户可以通过设置 TTL 来使未确认的消息到达已经确认的状态。

上述两种策略的设置都是在 NameSpace 的级别进行设置。

持久topic & 非持久topic

  • 持久topic: 持久topic是指存储在Pulsar中的消息将持久保存,即使没有消费者订阅该topic,消息仍然会被保留。这使得消费者在稍后加入时可以接收到之前发布的消息。
  • 非持久topic: 非持久topic则不会保存消息,如果没有活跃的订阅者,消息将会被丢弃。这种类型的topic适用于实时性要求高、不需要历史消息的场景。

使用Pulsar Python API的简单示例,演示如何创建生产者和消费者。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pulsar import Client
 
# Pulsar 服务地址
service_url = 'pulsar://localhost:6650'
 
# 创建Pulsar客户端
client = Client(service_url)
 
# 创建一个生产者,发送消息到"persistent://public/default/my-topic"
producer = client.create_producer('persistent://public/default/my-topic')
 
# 发送一条消息
producer.send('Hello, Pulsar!')
 
# 关闭生产者
producer.close()
 
# 创建一个消费者,从"persistent://public/default/my-topic"接收消息
consumer = client.create_consumer('persistent://public/default/my-topic')
 
# 接收消息
msg = consumer.receive()
 
# 打印接收到的消息内容
print("Received message: ", msg.data)
 
# 关闭消费者
consumer.close()
 
# 关闭客户端
client.close()

参考