Zookeeper Golang客户端:go-zookeeper的基本使用

1.连接到ZK server端

var hosts = []string{"localhost:8000"}//server端host
conn, _, err := zk.Connect(hosts, time.Second*5)
defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	

2.增删改查

增加节点

var path="/test"
var data=[]byte("hello zk")
var flags=0
//flags有4种取值:
//0:永久,除非手动删除
//zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
//zk.FlagSequence  = 2:会自动在节点后面添加序号
//3:Ephemeral和Sequence,即,短暂且自动添加序号
var acls=zk.WorldACL(zk.PermAll)//控制访问权限模式

p,err_create:=conn.Create(path,data,flags,acls)
if err_create != nil {
	fmt.Println(err_create)
	return
}
fmt.Println("create:",p)

//输出 create:/test

删改与增不同在于其函数中的version参数,其中version是用于 CAS支持
可以通过此种方式保证原子性

func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
func (c *Conn) Delete(path string, version int32) error

3.watch机制

Java API中是通过Watcher实现的,在go-zookeeper中则是通过Event。道理都是一样的
全局监听:
1.调用zk.WithEventCallback(callback)设置回调

//如下:
package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var hosts = []string{"localhost:8000"}

var path1 = "/whatzk"

var flags int32 = zk.FlagEphemeral
var data1 = []byte("hello,this is a zk go test demo!!!")
var acls = zk.WorldACL(zk.PermAll)

func main() {
	option := zk.WithEventCallback(callback)

	conn, _, err := zk.Connect(hosts, time.Second*5, option)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	

	_, _, _, err = conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}

	create(conn, path1, data1)
	
	time.Sleep(time.Second * 2)

	_, _, _, err = conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}
	delete(conn, path1)


}

func callback(event zk.Event) {
	fmt.Println("*******************")
	fmt.Println("path:", event.Path)
	fmt.Println("type:", event.Type.String())
	fmt.Println("state:", event.State.String())
	fmt.Println("-------------------")
}

func create(conn *zk.Conn, path string, data []byte) {
	_, err_create := conn.Create(path, data, flags, acls)
	if err_create != nil {
		fmt.Println(err_create)
		return
	}
	
}

//输出:
*******************
path: 
type: EventSession
state: StateConnecting
-------------------
*******************
path: 
type: EventSession
state: StateConnected
-------------------
*******************
path: 
type: EventSession
state: StateHasSession
-------------------
*******************
path: /whatzk
type: EventNodeCreated
state: Unknown
-------------------
*******************
path: /whatzk
type: EventNodeDeleted
state: Unknown
-------------------

部分监听:
1.调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次
2.开启一个协程处理chanel中传来的event事件
(注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main)

//部分代码如下:
func main() {
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	

	_, _, ech, err := conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}
	go watchCreataNode(ech)
	create(conn, path1, data1)

	
}

func watchCreataNode(ech <-chan zk.Event){
	event:=<-ech
	fmt.Println("*******************")
	fmt.Println("path:", event.Path)
	fmt.Println("type:", event.Type.String())
	fmt.Println("state:", event.State.String())
	fmt.Println("-------------------")
}

//输出如下:
*******************
path: /whatyy
type: EventNodeCreated
state: Unknown
-------------------

注意:

1.如果即设置了全局监听有设置了部分监听,那么最终是都会触发的,并且全局监听在先执行
2.如果设置了监听子节点,那么事件的触发是先子节点后父节点

4.客户端随机hostname支持

ZK Java client端,相关链接:
(http://www.jianshu.com/p/1068d0896e65)
最终就是Round Robin策略

//使用步骤如下:(相关代码位于dnshostprovider.go中)

var hosts = []string{"host1:8000","host2:8000","host3:8000"}
hostPro:=new(zk.DNSHostProvider)
err:=hostPro.Init(hosts)//先初始化
if err != nil {
	fmt.Println(err)
	return
}
server,retryStart:=hostPro.Next()//获得host
...
hostPro.Connected()  //连接成功后会调用
}


//上面的一系列步骤都集成在func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)中
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐