在Docker中启动用于学习

在Docker中启动

以独立模式standalone启动 Pulsar

1
2
3
4
5
6
7
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.2.2 \
bin/pulsar standalone

启动后发现运行失败

1
2
3
4
docker ps -a
CONTAINER ID   IMAGE                       COMMAND                   CREATED         STATUS                       PORTS     NAMES
f4679e187635   apachepulsar/pulsar:3.2.2   "bin/pulsar standalo…"   2 minutes ago   Exited (137) 2 minutes ago             stoic_liskov

显示退出信号是137,表示容器是被系统发送的SIGKILL信号所终止。

这种情况一般是当前docker环境分配的资源有限而导致,比如内存和CPU资源不足,或者容器没有守护进程模式下运行。

如果是资源不足,修改运行资源,比如:

1
2
3
4
5
6
7
docker run -it --memory=4g --cpus=4 \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:3.2.2 \
bin/pulsar standalone

守护模式如下:

1
docker run -d  f4679e187635  /bin/bash -c 'while true; do sleep 1; done'

如果是通过colima运行的docker环境,可以这样修改

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
colima start --edit
INFO[0000] editing in vim
colima is currently running, restart to apply changes? [y/N] y
INFO[0013] stopping colima
INFO[0013] stopping ...                                  context=docker
INFO[0015] stopping ...                                  context=vm
INFO[0019] done
INFO[0022] starting colima
INFO[0022] runtime: docker
INFO[0022] starting ...                                  context=vm
INFO[0049] provisioning ...                              context=docker
INFO[0050] starting ...                                  context=docker
INFO[0054] done

注意:修改后镜像和容器实例会被删除,需要重新拉取。

修改后的资源情况如下:

1
2
3
colima list
PROFILE      STATUS     ARCH      CPUS    MEMORY    DISK     RUNTIME    ADDRESS
default      Running    x86_64    2       4GiB      10GiB    docker

重启后重新运行镜像

1
2
3
4
5
6
7
docker ps -a
CONTAINER ID   IMAGE                       COMMAND                   CREATED              STATUS              PORTS                                                                                  NAMES
50fa87e4a200   apachepulsar/pulsar:3.2.2   "bin/pulsar standalo…"   About a minute ago   Up About a minute   0.0.0.0:6650->6650/tcp, :::6650->6650/tcp, 0.0.0.0:8080->8080/tcp, :::8080->8080/tcp   gifted_cohen

docker images -a
REPOSITORY            TAG       IMAGE ID       CREATED       SIZE
apachepulsar/pulsar   3.2.2     821180a05a49   5 weeks ago   939MB

容器运行成功,当前Pulsar实例的交互URL:

  • pulsar://localhost:6650
  • http://localhost:8080

使用python客户端

Install the Pulsar Python client library directly from PyPI:

1
pip install pulsar-client

Consume a message

Create a consumer and subscribe to the topic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', subscription_name='my-sub')

while True:
    msg = consumer.receive()
    print("Received message: '%s'" % msg.data())
    consumer.acknowledge(msg)

client.close()

Produce a message

Start a producer to send some test messages:

1
2
3
4
5
6
7
8
9
import pulsar

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')

for i in range(10):
    producer.send(('hello-pulsar-%d' % i).encode('utf-8'))

client.close()

Get the topic statistics

1
curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool

Topic

消息数据的载体,在 Pulsar 中 Topic 可以指定分为多个 partition,如果不设置默认只有一个 partition。

Topic命名URL定义:

{persistent|non-persistent}://tenant/namespace/topic
  • persistent / non-persistent

    表示主题的类型,主题分为持久化和非持久化主题,默认是持久化的类型。持久化的主题会将消息保存到磁盘上,而非持久化的主题就不会将消息保存到磁盘。

  • tenant

    Pulsar 实例中主题的租户,租户对于 Pulsar 中的多租户至关重要,并且分布在集群中。

  • namespace

    将相关联的 Topic 作为一个组来管理,是管理 Topic 的基本单元。每个租户可以有一个或多个命名空间。

  • topic

    Pulsar 中的主题被命名为通道,用于将消息从生产者传输到消费者。

订阅消息模式

  • 独占模式(Exclusive)

    一个订阅只能与一个消费者关联,只有这个消费者可以接收到主题的全部消息,如果该消费者出现故障,消费过程将停止。

  • 共享模式(Shared)

    消息通过轮询机制(或自定义机制)分发给不同的消费者,每个消息只会被分发给一个消费者,如果消费者断开连接,所有发送给它但未被确认的消息将被重新分配给其他存活的消费者。

  • 灾备模式(Failover)

    当存在多个消费者时,消息将按字典顺序排序,第一个消费者将作为唯一接收消息的消费者,如果该消费者断开连接,所有未被确认的消息和后续的消息将重新分配给队列中的下一个消费者。

  • 键共享模式(Key_Shared)

    当存在多个消费者时,消息将根据其键进行分发,具有相同键的消息只会被分发到同一个消费者。

使用golang客户端

Go Client

Create a Producer:

创建一个生产者,定时向指定的Topic发送消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"fmt"
	"context"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})

	defer client.Close()

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "my-topic",
	})
	if err != nil {
		fmt.Println("Create producer failed", err)
		return
	}

	defer producer.Close()


	ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
	defer cancel()

	tick := time.NewTicker(2*time.Second)
	for range tick.C {
		_, err = producer.Send(ctx, &pulsar.ProducerMessage{
			Payload: []byte("hello"),
		})


		if err != nil {
			fmt.Println("Failed to publish message", err)
			break
		} else {
			fmt.Println("Published message")
		}
	}
}

Create two Consumer:

创建多个消费者,订阅模式为Shared,通过不同的订阅名称来负载均衡消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
	"fmt"
	"log"
	"context"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func main() {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL: "pulsar://localhost:6650",
	})
	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
	defer cancel()

	go func() {
		subName := "my-sub-1"
		consumer, err := client.Subscribe(pulsar.ConsumerOptions{
			Topic:            "my-topic",
			SubscriptionName: subName,
			Type:             pulsar.Shared,
		})
		if err != nil {
			log.Fatal(err)
		}

		defer consumer.Close()

		for {
			select {
			case <-ctx.Done():
				return
			default:
			}

			msg, err := consumer.Receive(context.Background())
			if err != nil {
				log.Fatal(err)
			}

			fmt.Printf("Sub: %s -- Received message msgId: %#v -- content: '%s'\n",
				subName, msg.ID(), string(msg.Payload()))
		}
	}()

	subName := "my-sub-2"
	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "my-topic",
		SubscriptionName: subName,
		Type:             pulsar.Shared,
	})
	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	for {
		select {
		case <-ctx.Done():
			return
		default:
		}

		msg, err := consumer.Receive(context.Background())
		if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("Sub: %s -- Received message msgId: %#v -- content: '%s'\n",
			subName, msg.ID(), string(msg.Payload()))
	}
}

运行结果

启动订阅

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
go run pulsar_consumer.go
INFO[0000] Connecting to broker                          remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established                    local_addr="127.0.0.1:52714" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready                           local_addr="127.0.0.1:52714" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected consumer                            consumerID=2 name=lywal subscription=my-sub-1 topic="persistent://public/default/my-topic"
INFO[0000] Created consumer                              consumerID=2 name=lywal subscription=my-sub-1 topic="persistent://public/default/my-topic"
INFO[0000] Connected consumer                            consumerID=1 name=mtmoa subscription=my-sub-2 topic="persistent://public/default/my-topic"
INFO[0000] Created consumer                              consumerID=1 name=mtmoa subscription=my-sub-2 topic="persistent://public/default/my-topic"
Sub: my-sub-1 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc000222160), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0000e8000), receivedTime:time.Time{wall:0xc185a6e5906b8d60, ext:37137186794, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'
Sub: my-sub-2 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc000222140), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc00031fe00), receivedTime:time.Time{wall:0xc185a6e5906b23e8, ext:37137159841, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'
Sub: my-sub-1 -- Received message msgId: &pulsar.trackingMessageID{messageID:(*pulsar.messageID)(0xc00002cda0), tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc0000e8000), receivedTime:time.Time{wall:0xc185a6e60b2cee70, ext:39049156028, loc:(*time.Location)(0x100b5c5c0)}} -- content: 'hello'

可以看到消息被均衡到不同订阅者,即每个消息只会被一个订阅者消费。

启动生产消息

1
2
3
4
5
6
7
8
9
go run pulsar_producer.go
INFO[0000] Connecting to broker                          remote_addr="pulsar://localhost:6650"
INFO[0000] TCP connection established                    local_addr="127.0.0.1:52734" remote_addr="pulsar://localhost:6650"
INFO[0000] Connection is ready                           local_addr="127.0.0.1:52734" remote_addr="pulsar://localhost:6650"
INFO[0000] Connected producer                            cnx="127.0.0.1:52734 -> 127.0.0.1:6650" epoch=0 topic="persistent://public/default/my-topic"
INFO[0000] Created producer                              cnx="127.0.0.1:52734 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-0-1 topic="persistent://public/default/my-topic"
Published message
Published message
Published message

参考