php 连接zookeeper实例
1、安装成功zookeeper后,在zookeeper 的bin目录下有启动相应的启动脚本启动Server ./zkServer.sh start启动client:(*注:cli需要安装java) zkCli.sh2、PHP实例:class ZookeeperDemo e
·
1、安装成功zookeeper后,在zookeeper 的bin目录下有启动相应的启动脚本
启动Server
./zkServer.sh start
启动client:(*注:cli需要安装java)
zkCli.sh
2、PHP实例:
<?php
class
ZookeeperDemo
extends
Zookeeper {
public function
watcher( $i, $type, $key ) {
echo
"Insider Watcher\n"
;
// Watcher gets consumed so we need to set a new one
$this->get(
'/test'
,
array
($this,
'watcher'
) );
}
}
$zoo =
new
ZookeeperDemo(
'127.0.0.1:2181'
);
$zoo->get(
'/test'
,
array
($zoo,
'watcher'
) );
while
(
true
) {
echo
'.'
;
sleep(2);
}
leader与worker任务的分配:
可以启动3个php进程,查看脚本的运行。
进程1:
[root@localhost zookeeper]# php -f worker.php
I'm registred as: w-0000000010
Nobody here, I'm the leader
Leading
进程2:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000011
I'm watching w-0000000010
Working
进程3:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000012
I'm watching w-0000000011
Working
ctrl + c 关闭leader进程后,会发现进程2与3会选举出新的leader
leader与worker任务的分配:
class
Worker
extends
Zookeeper {
const
CONTAINER =
'/cluster'
;
protected
$acl
=
array
(
array
(
'perms'
=> Zookeeper::
PERM_ALL
,
'scheme'
=>
'world'
,
'id'
=>
'anyone'
) );
private
$isLeader
=
false
;
private
$znode
;
public function
__construct( $host =
''
, $watcher_cb =
null
, $recv_timeout = 10000 ) {
parent
:: __construct( $host, $watcher_cb, $recv_timeout );
}
public function
register() {
if
( ! $this->exists(
self
::
CONTAINER
) ) {
$this->create(
self
::
CONTAINER
,
null
, $this->
acl
);
}
$this->znode = $this->create(
self
::CONTAINER .
'/w-'
,
null
,
$this->acl,
Zookeeper::
EPHEMERAL | Zookeeper::SEQUENCE );
$this->
znode
= str_replace(
self
::
CONTAINER
.
'/'
,
''
, $this->
znode
);
printf(
"I'm registred as: %s\n"
, $this->
znode
);
$watching = $this->watchPrevious();
if
( $watching == $this->
znode
) {
printf(
"Nobody here, I'm the leader\n"
);
$this->setLeader(
true
);
}
else
{
printf(
"I'm watching %s\n"
, $watching );
}
}
public function
watchPrevious() {
$workers = $this->getChildren(
self
::
CONTAINER
);
sort( $workers );
$size = sizeof( $workers );
for
( $i = 0 ; $i < $size ; $i++ ) {
if
( $this->
znode
== $workers[ $i ] ) {
if
( $i > 0 ) {
$this->get(
self
::
CONTAINER
.
'/'
. $workers[ $i - 1 ],
array
( $this,
'watchNode'
) );
return
$workers[ $i - 1 ];
}
return
$workers[ $i ];
}
}
throw new
Exception( sprintf(
"Something went very wrong! I can't find myself: %s/%s"
,
self
::
CONTAINER
,
$this->
znode
) );
}
public function
watchNode( $i, $type, $name ) {
$watching = $this->watchPrevious();
if
( $watching == $this->
znode
) {
printf(
"I'm the new leader!\n"
);
$this->setLeader(
true
);
}
else
{
printf(
"Now I'm watching %s\n"
, $watching );
}
}
public function
isLeader() {
return
$this->
isLeader
;
}
public function
setLeader($flag) {
$this->
isLeader
= $flag;
}
public function
run() {
$this->register();
while
(
true
) {
if
( $this->isLeader() ) {
$this->doLeaderJob();
}
else
{
$this->doWorkerJob();
}
sleep( 2 );
}
}
public function
doLeaderJob() {
echo
"Leading\n"
;
}
public function
doWorkerJob() {
echo
"Working\n"
;
}
}
$worker =
new
Worker(
'127.0.0.1:2181'
);
$worker->run();
进程1:
[root@localhost zookeeper]# php -f worker.php
I'm registred as: w-0000000010
Nobody here, I'm the leader
Leading
进程2:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000011
I'm watching w-0000000010
Working
进程3:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000012
I'm watching w-0000000011
Working
ctrl + c 关闭leader进程后,会发现进程2与3会选举出新的leader
更多推荐
已为社区贡献1条内容
所有评论(0)