Golang Zookeeper 客户端操作

golang zookeeper 常用客户端

连接zookeeper

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
    "fmt"
    "time"

    "github.com/go-zookeeper/zk"
)

func main() {
    c, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second) //*10)
    if err != nil {
        panic(err)
    }
    children, stat, ch, err := c.ChildrenW("/")
    if err != nil {
        panic(err)
    }
    fmt.Printf("%+v %+v\n", children, stat)
    e := <-ch
    fmt.Printf("%+v\n", e)
}

增删改查

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main

import (
    "fmt"
    "time"

    "github.com/go-zookeeper/zk"
)

var (
    path = "/test"
)

// 增
func add(conn *zk.Conn) {
    var data = []byte("test value")
    // flags有4种取值:
    // 0:永久,除非手动删除
    // zk.FlagEphemeral = 1:短暂,session断开则该节点也被删除
    // zk.FlagSequence  = 2:会自动在节点后面添加序号
    // 3:Ephemeral和Sequence,即,短暂且自动添加序号
    var flags int32 = 0
    // 获取访问控制权限
    acls := zk.WorldACL(zk.PermAll)
    s, err := conn.Create(path, data, flags, acls)
    if err != nil {
        fmt.Printf("创建失败: %v\n", err)
        return
    }
    fmt.Printf("创建: %s 成功", s)
}

// 查
func get(conn *zk.Conn) {
    data, _, err := conn.Get(path)
    if err != nil {
        fmt.Printf("查询%s失败, err: %v\n", path, err)
        return
    }
    fmt.Printf("%s 的值为 %s\n", path, string(data))
}

// 删改与增不同在于其函数中的version参数,其中version是用于 CAS支持
// 可以通过此种方式保证原子性
// 改
func modify(conn *zk.Conn) {
    new_data := []byte("hello zookeeper")
    _, sate, _ := conn.Get(path)
    _, err := conn.Set(path, new_data, sate.Version)
    if err != nil {
        fmt.Printf("数据修改失败: %v\n", err)
        return
    }
    fmt.Println("数据修改成功")
}

// 删
func del(conn *zk.Conn) {
    _, sate, _ := conn.Get(path)
    err := conn.Delete(path, sate.Version)
    if err != nil {
        fmt.Printf("数据删除失败: %v\n", err)
        return
    }
    fmt.Println("数据删除成功")
}

func main() {
    // 创建zk连接地址
    hosts := []string{"127.0.0.1:2181"}
    // 连接zk
    conn, _, err := zk.Connect(hosts, time.Second*5)
    defer conn.Close()
    if err != nil {
        fmt.Println(err)
        return
    }

    /* 增删改查 */
    add(conn)
    get(conn)
    modify(conn)
    del(conn)
    get(conn)
}

watch机制

go-zookeeper通过Event结构来通知

  1. 全局监听
    1. 调用zk.WithEventCallback(callback)设置回调
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main

import (
    "fmt"
    "time"

    "github.com/go-zookeeper/zk"
)

var (
    hosts       = []string{"127.0.0.1:2181"}
    path        = "/wtzk"
    flags int32 = zk.FlagEphemeral
    data        = []byte("zk data 001")
    acls        = zk.WorldACL(zk.PermAll)
)

func main() {
    // 创建监听的option,用于初始化zk
    eventCallbackOption := zk.WithEventCallback(callback)
    // 连接zk
    conn, _, err := zk.Connect(hosts, time.Second*5, eventCallbackOption)
    defer conn.Close()
    if err != nil {
        fmt.Println(err)
        return
    }

    // 开始监听path
    _, _, _, err = conn.ExistsW(path)
    if err != nil {
        fmt.Println(err)
        return
    }

    // 触发创建数据操作
    create(conn, path, data)

    //再次监听path
    _, _, _, err = conn.ExistsW(path)
    if err != nil {
        fmt.Println(err)
        return
    }
    // 触发删除数据操作
    del(conn, path)
}

// zk watch 回调函数
func callback(event zk.Event) {
    // zk.EventNodeCreated
    // zk.EventNodeDeleted
    fmt.Println("###########################")
    fmt.Println("path: ", event.Path)
    fmt.Println("type: ", event.Type.String())
    fmt.Println("state: ", event.State.String())
    fmt.Println("---------------------------")
}

// 创建数据
func create(conn *zk.Conn, path string, data []byte) {
    _, err := conn.Create(path, data, flags, acls)
    if err != nil {
        fmt.Printf("创建数据失败: %v\n", err)
        return
    }
    fmt.Println("创建数据成功")
}

// 删除数据
func del(conn *zk.Conn, path string) {
    _, stat, _ := conn.Get(path)
    err := conn.Delete(path, stat.Version)
    if err != nil {
        fmt.Printf("删除数据失败: %v\n", err)
        return
    }
    fmt.Println("删除数据成功")
}
  1. 部分监听
    1. 调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次
    2. 开启一个协程处理chanel中传来的event事件(防止阻塞)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package main

import (
    "fmt"
    "time"

    "github.com/go-zookeeper/zk"
)

var (
    hosts       = []string{"127.0.0.1:2181"}
    path        = "/wtzk"
    flags int32 = zk.FlagEphemeral
    data        = []byte("zk data 001")
    acls        = zk.WorldACL(zk.PermAll)
)

func main() {
    // 连接zk
    conn, _, err := zk.Connect(hosts, time.Second*5)
    defer conn.Close()
    if err != nil {
        fmt.Println(err)
        return
    }

    // 开始监听path
    _, _, event, err := conn.ExistsW(path)
    if err != nil {
        fmt.Println(err)
        return
    }

    // 协程调用监听事件,防止阻塞main函数
    go watchZkEvent(event)

    // 触发创建数据操作
    create(conn, path, data)

}

// zk 回调函数
func watchZkEvent(e <-chan zk.Event) {
    event := <-e
    fmt.Println("###########################")
    fmt.Println("path: ", event.Path)
    fmt.Println("type: ", event.Type.String())
    fmt.Println("state: ", event.State.String())
    fmt.Println("---------------------------")
}

// 创建数据
func create(conn *zk.Conn, path string, data []byte) {
    _, err := conn.Create(path, data, flags, acls)
    if err != nil {
        fmt.Printf("创建数据失败: %v\n", err)
        return
    }
    fmt.Println("创建数据成功")
}
  1. 如果即设置了全局监听又设置了部分监听,那么最终是都会触发的,并且全局监听在先执行
  2. 如果设置了监听子节点,那么事件的触发是先子节点后父节点

客户端随机hostname支持

最终就是Round Robin策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var hosts = []string{"host01:2181", "host02:2181", "host03:2181",}
hostPro := new(zk.DNSHostProvider)
//先初始化
err := hostPro.Init(hosts)

if err != nil {
    fmt.Println(err)
    return
}
//获得host
server, retryStart := hostPro.Next()
fmt.Println(server, retryStart)
//连接成功后会调用
hostPro.Connected()

来源