zl程序教程

您现在的位置是:首页 >  系统

当前栏目

Linux(CentOS)中常用软件安装,使用及异常——Zookeeper, Kafka

2023-09-27 14:29:21 时间

可以在http://zookeeper.apache.org/这里下载需要的安装包。这里采用的是zookeeper-3.4.6.tar.gz安装包。
1 首先将安装包解压:


[root@hidden util]# tar -zxvf zookeeper-3.4.6.tar.gz

[root@hidden util]# cd zookeeper-3.4.6

[root@hidden zookeeper-3.4.6]#ls

bin dist-maven LICENSE.txt src

build.xml docs NOTICE.txt zookeeper-3.4.6.jar

CHANGES.txt ivysettings.xml README-packaging.txt zookeeper-3.4.6.jar.asc

conf ivy.xml README.txt zookeeper-3.4.6.jar,md5

contrib lib recipes zookeeper-3.4.6.jar.sha1

2 修改系统的环境变量


export ZOOKEEPER_INSTALL=/root/util/zookeeper-3.4.6 export PATH=$PATH:$ZOOKEEPER_INSTALL/bin [root@hidden zookeeper-3.4.6]# source /etc/profile

3 创建Zookeeper的配置文件


[root@hidden zookeeper-3.4.6]#cd conf

[root@hidden conf]# cp zoo_example.cfg zoo.cfg

4 配置zoo.cfg


tickTime=2000 #ZooKeeper服务器心跳时间,单位为ms

initLimit=10 #投票选举心leader的初始化时间

syncLimit=5 #leader与follower心跳检测最大容忍时间,响应超过syncLimit*tickTime,leader认为follower死掉,从服务器列表中删除follower

clientPort=2181 #端口

dataDir=/tmp/ZooKeeper/data #数据目录

dataLogDir=/tmp/ZooKeeper/log #日志目录

5 创建配置中的相应目录


7 可以使用ZooKeeper自带的客户端工具来查看ZooKeeper的节点建立情况(bin/zkCli.sh)
[图片]


ZooKeeper实现了一个层次命名空间的数据模型,也可以认为它就是一个小型的、精简的文件系统。它的每个节点称为znode, znode除了本身能够包含一部分数据之外,还能够拥有子节点,当节点上的数据发生变化,或者其子节点发生变化,基于watcher机制,会发出相应的通知给订阅其状态变化的客户端。
首先,实例化一个ZooKeeper对象,指定其三个参数。url为ZooKeeper服务器的地址。sessionTimeOut为会话的超时时间,ZooKeeper的会话超时时间的长度由客户端来确定,但是ZooKeeper的Server端会有两个配置,minSessionTimeout和maxSessionTimeout,minSessionTimeout的值默认为2倍的tickTime, maxSessionTimeout的值默认为20倍的tickTime,单位都是ms. tickTime也是服务端的一个配置项,是Server内部控制时间逻辑的最小时间单位,如果客户端发来的sessionTimeout超过minSessionTimeout~maxSessionTimeout这个范围,Server会自动取minSessionTimeout或者maxSessionTimeout作为sessionTimeout, 然后为这个Client新建一个session对象。最后一个参数为默认的watcher.如果包含boolean watch的读方法中传入true, 则将默认的watcher注册为所关注时间的watcher,如果传入false,则不注册任何watcher,这里暂且定为空。


ZooKeeper zooKeeper = new ZooKeeper(url,sesssionTimeout,null);//注意将安装目录下的zookeeper的jar包拷贝出来导入你的project的libs中;zookeeper的默认端口号为2181

1 创建节点
通过ZooKeeper的API新增一个znode节点,节点在被创建时,需要制定节点的路径(此处为/root)包含的字节数据,访问权限(如果不想设置,则制定为Ids.OPEN_ACL_UNSAFE),以及创建的节点类型,节点的类型如表所示:


CreateMode.PERSISTENT_SEQUENTIAL 持久节点,该节点在客户端断开后不会删除,并将在其名下附加一个单调递增数
CreateMode.EPHEMERAL_SEQUENTIAL 临时节点,该节点在客户端断开后删除,并将在其名下附加一个单调递增数
//创建/root节点,其包含的数据为“root data",访问权限为开放,所有人均可以访问,创建模式为持久化节点

zooKeeper.create("/root","root data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

2 删除节点
当不需要某个节点,或者某个节点上的信息已经失效时,使用delete方法可以将该节点删除,删除时需要制定节点的版本号version,如果设置为-1,则匹配所有的版本,ZooKeeper会比较删除的节点版本是否和服务器上的版本一致,如果不一致则抛出异常。
删除节点实例:


3 设置和获取节点内容
如果想在已有节点中保存数据,可以通过ZooKeeper的setData方法,将数据保存到节点上,也可以通过ZooKeeper的getData方法来获取该节点保存的数据,一个znode中最多能够保存1MB的数据。
设置和获取节点内容的实例。


//设置/root节点的数据,版本号为-1,如果匹配不到相应的节点,会抛出异常

zooKeeper.setData("/root","hello".getBytes(),-1);

//取得/root节点的数据,并反回其stat

Stat stat = new Stat();

byte[] data = zooKeeper.getData("/root",false,stat);

setData方法设置/root节点的数据为“hello”,getData方法取得root节点上保存的节点数据,false表示不使用默认的watcher,第三个参数为Stat, 表示节点的状态,是一个传出的参数,将会返回该节点当前的状态信息。
4 添加子节点
ZooKeeper支持在已有的节点下添加子节点,同样也是用使用create()方法,但是父节点必须存在,否则会跑出异常。
创建子节点的实例:

zooKeeper.create("/root","root data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

zooKeeper.create("/root/child","child data".getBytes(),Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

5 判断节点是否存在
当进行系统初始化时,或者当前需要给一个节点创建子节点时,通常需要判断系统中的一些节点是否存在。


判断/root/child1节点是否存在,如果存在,返回stat不为null,否则为null.
6 watcher的实现
当节点的状态发生变化,通过watcher机制,可以让客户端得到通知,watcher需要实现org.apache.ZooKeeper.Watcher接口。节点的状态变化主要包含如表所示的几种情况。


public class ZKWatcher implements Watcher{

@Override public void process(WacthedEvent event){

if(event.getType()==EventType.NodeDeleted)

//del

if(event.getType()==EventType.NodeChildrenChanged )

//del

if(event.getType()==EventType.NodeCreated)

//del

if(event.getType()==EventType.NodeDataChanged )

//del

}

需要注意的是,ZooKeeper的watcher是一次性的,也就是说,每次在处理完状态变化时间之后,需要重新注册watcher,这一点很让人抓狂。这个特性也使得在处理时间和重新加上watcher这段时间发生的节点状态变化将无法被感知。


1 ZooKeeer常常发生下面两种系统异常:
org.apache.ZooKeeper.KeeperException.ConnectionLossException,客户端与其中的一台服务器socket连接出现异常,连接丢失;
org.apache.ZooKeeper.KeeperException.SessionExpiredException, 客户端的session已经超过sessionTimeout,未进行任何操作。
ConnectionLossException异常可以通过重试进行处理,客户端会根据初始化ZooKeeper时传递的服务列表,自动尝试下一个服务端节点,而在这段时间内,服务端节点变更的事件就会丢失。
SessionExpiredException异常不能通过重试解决,需要应用重新创建一个新的客户端(new ZooKeeper()),这时所有的watcher和EPHEMERAL节点都将失效。
一般情况下,不采用原生态的API进行客户端操作,而是采用zkClient或者Curator进行操作,这个会在后续的文章中讲述。
2 异常

2016-08-04 20:12:54 -[INFO] - [Opening socket connection to server 10.101.139.86/10.101.139.86:2181. Will not attempt to authenticate using SASL (unknown error)] - [org.apache.zookeeper.ClientCnxn$SendThread:975]

2016-08-04 20:12:55 -[WARN] - [Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect] - [org.apache.zookeeper.ClientCnxn$SendThread:1102]

java.net.ConnectException: Connection refused: no further information

 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

 at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)

 at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)

 at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

原因:
1. 网络不通,端口不可用等;
2. zookeeper服务程序未正常启动;
3. 防火墙未关闭;


[root@hidden util]# tar -zxvf kafka_2.8.9-0.8.1.1

[root@hidden util]# cd kafka_2.8.9-0.8.1.1

[root@hidden util]# ls

bin config libs LICENSE logs NOTICE

/bin 启动和停止命令等
/config 配置文件
/libs 类库


bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在producer端输入字符串并回车,查看consumer端是否显示。

关于kafka的介绍以及在java中如何操作kafka会在之后的博文中展开介绍。


linux安装zookeeper集群,包括集群启停脚本 #### 三台机器安装zookeeper集群 **注意事项:安装前三台机器一定要保证时钟同步** * 我这里是有3台服务器,分别hostname为node01、node02、node03。 * 三台机器已经配置好了ssh。 * zookeeper使用的是cdh5的zookeeper包,没有使用原生zookeeper项目的包。 * 三台机器已经进行了时钟同步。
【ZooKeeper】② ZooKeeper 基础(ZooKeeper 应用场景、Linux上安装和使用 ZooKeeper、ZooKeeper特点、Linux 上ZooKeeper 节点的增删改查) 常见的事件类型: NodeDataChanged - 节点数据改变事件 NodeChildrenChanged: 子节点数量改变事件 NodeCreated: 节点创建事件 数据获取一般绑定 NodeDataChanged 获取子节点列表一般绑定 NodeChildrenChanged