在Docker中启动用于学习
在Docker中启动
以独立模式standalone
启动 Pulsar
1
2
3
4
5
6
7
|
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.2.2 \
bin/pulsar standalone
|
启动后发现运行失败
1
2
3
4
|
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f4679e187635 apachepulsar/pulsar:3.2.2 "bin/pulsar standalo…" 2 minutes ago Exited (137) 2 minutes ago stoic_liskov
|
显示退出信号是137,表示容器是被系统发送的SIGKILL信号所终止。
这种情况一般是当前docker环境分配的资源有限而导致,比如内存和CPU资源不足,或者容器没有守护进程模式下运行。
如果是资源不足,修改运行资源,比如:
1
2
3
4
5
6
7
|
docker run -it --memory=4g --cpus=4 \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.2.2 \
bin/pulsar standalone
|
守护模式如下:
1
|
docker run -d f4679e187635 /bin/bash -c 'while true; do sleep 1; done'
|
如果是通过colima运行的docker环境,可以这样修改
1
2
3
4
5
6
7
8
9
10
11
12
13
|
colima start --edit
INFO[0000] editing in vim
colima is currently running, restart to apply changes? [y/N] y
INFO[0013] stopping colima
INFO[0013] stopping ... context=docker
INFO[0015] stopping ... context=vm
INFO[0019] done
INFO[0022] starting colima
INFO[0022] runtime: docker
INFO[0022] starting ... context=vm
INFO[0049] provisioning ... context=docker
INFO[0050] starting ... context=docker
INFO[0054] done
|
注意:修改后镜像和容器实例会被删除,需要重新拉取。
修改后的资源情况如下:
1
2
3
|
colima list
PROFILE STATUS ARCH CPUS MEMORY DISK RUNTIME ADDRESS
default Running x86_64 2 4GiB 10GiB docker
|
重启后重新运行镜像
1
2
3
4
5
6
7
|
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
50fa87e4a200 apachepulsar/pulsar:3.2.2 "bin/pulsar standalo…" About a minute ago Up About a minute 0.0.0.0:6650->6650/tcp, :::6650->6650/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp gifted_cohen
docker images -a
REPOSITORY TAG IMAGE ID CREATED SIZE
apachepulsar/pulsar 3.2.2 821180a05a49 5 weeks ago 939MB
|
容器运行成功,当前Pulsar实例的交互URL:
- pulsar://localhost:6650
- http://localhost:8080
使用python客户端
Install the Pulsar Python client library directly from PyPI:
1
|
pip install pulsar-client
|
Consume a message
Create a consumer and subscribe to the topic:
1
2
3
4
5
6
7
8
9
10
11
|
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', subscription_name='my-sub')
while True:
msg = consumer.receive()
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
|
Produce a message
Start a producer to send some test messages:
1
2
3
4
5
6
7
8
9
|
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
client.close()
|
Get the topic statistics
1
|
curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool
|
Topic
消息数据的载体,在 Pulsar 中 Topic 可以指定分为多个 partition,如果不设置默认只有一个 partition。
Topic命名URL定义:
{persistent|non-persistent}://tenant/namespace/topic
-
persistent / non-persistent
表示主题的类型,主题分为持久化和非持久化主题,默认是持久化的类型。持久化的主题会将消息保存到磁盘上,而非持久化的主题就不会将消息保存到磁盘。
-
tenant
Pulsar 实例中主题的租户,租户对于 Pulsar 中的多租户至关重要,并且分布在集群中。
-
namespace
将相关联的 Topic 作为一个组来管理,是管理 Topic 的基本单元。每个租户可以有一个或多个命名空间。
-
topic
Pulsar 中的主题被命名为通道,用于将消息从生产者传输到消费者。
订阅消息模式
-
独占模式(Exclusive)
一个订阅只能与一个消费者关联,只有这个消费者可以接收到主题的全部消息,如果该消费者出现故障,消费过程将停止。
-
共享模式(Shared)
消息通过轮询机制(或自定义机制)分发给不同的消费者,每个消息只会被分发给一个消费者,如果消费者断开连接,所有发送给它但未被确认的消息将被重新分配给其他存活的消费者。
-
灾备模式(Failover)
当存在多个消费者时,消息将按字典顺序排序,第一个消费者将作为唯一接收消息的消费者,如果该消费者断开连接,所有未被确认的消息和后续的消息将重新分配给队列中的下一个消费者。
-
键共享模式(Key_Shared)
当存在多个消费者时,消息将根据其键进行分发,具有相同键的消息只会被分发到同一个消费者。
使用golang客户端
Go Client
Create a Producer:
创建一个生产者,定时向指定的Topic发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package main
import (
"fmt"
"context"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
fmt.Println("Create producer failed", err)
return
}
defer producer.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
tick := time.NewTicker(2*time.Second)
for range tick.C {
_, err = producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
if err != nil {
fmt.Println("Failed to publish message", err)
break
} else {
fmt.Println("Published message")
}
}
}
|
Create two Consumer:
创建多个消费者,订阅模式为Shared,通过不同的订阅名称来负载均衡消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
package main
import (
"fmt"
"log"
"context"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
go func() {
subName := "my-sub-1"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: subName,
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Sub: %s -- Received message msgId: %#v -- content: '%s'\n",
subName, msg.ID(), string(msg.Payload()))
}
}()
subName := "my-sub-2"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: subName,
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Sub: %s -- Received message msgId: %#v -- content: '%s'\n",
subName, msg.ID(), string(msg.Payload()))
}
}
|
运行结果
启动订阅
1
2
3
4
5
6
7
8
9
10
11
|
go run pulsar_consumer.go
INFO[0000] Connecting to broker remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established local_addr="127.0.0.1:52714" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready local_addr="127.0.0.1:52714" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected consumer consumerID=2 name=lywal subscription=my-sub-1 topic="persistent://public/default/my-topic"
INFO[0000] Created consumer consumerID=2 name=lywal subscription=my-sub-1 topic="persistent://public/default/my-topic"
INFO[0000] Connected consumer consumerID=1 name=mtmoa subscription=my-sub-2 topic="persistent://public/default/my-topic"
INFO[0000] Created consumer consumerID=1 name=mtmoa subscription=my-sub-2 topic="persistent://public/default/my-topic"
Sub: my-sub-1 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc000222160), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0000e8000), receivedTime:time.Time{wall:0xc185a6e5906b8d60, ext:37137186794, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'
Sub: my-sub-2 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc000222140), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc00031fe00), receivedTime:time.Time{wall:0xc185a6e5906b23e8, ext:37137159841, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'
Sub: my-sub-1 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc00002cda0), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0000e8000), receivedTime:time.Time{wall:0xc185a6e60b2cee70, ext:39049156028, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'
|
可以看到消息被均衡到不同订阅者,即每个消息只会被一个订阅者消费。
启动生产消息
1
2
3
4
5
6
7
8
9
|
go run pulsar_producer.go
INFO[0000] Connecting to broker remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established local_addr="127.0.0.1:52734" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready local_addr="127.0.0.1:52734" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected producer cnx="127.0.0.1:52734 -> 127.0.0.1:6650" epoch=0 topic="persistent://public/default/my-topic"
INFO[0000] Created producer cnx="127.0.0.1:52734 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-0-1 topic="persistent://public/default/my-topic"
Published message
Published message
Published message
|
参考