zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Apache Curator异步API和事务API

2023-09-11 14:18:40 时间

原生API中基本上所有的操作都有提供异步操作,Curator也有提供异步操作的API。

异步操作
在使用以上针对节点的操作API时,我们会发现每个接口都有一个inBackground()方法可供调用。此接口就是Curator提供的异步调用入口。对应的异步处理接口为BackgroundCallback。此接口指提供了一个processResult的方法,用来处理回调结果。其中processResult的参数event中的getType()包含了各种事件类型,getResultCode()包含了各种响应码。

/**
* Called when the async background operation completes
*
* @param client the client
* @param event operation result details
* @throws Exception errors
*/
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;

事件类型

getType(),代表本次事件的类型,主要有

CREATE、DELETE、EXISTS、GET_DATA、SET_DATA、CHILDREN、SYNC、GET_ACL、WATCHED和CLOSING 。

分别代表

curatorFramework.create()、
curatorFramework.delete()、
curatorFramework.checkExists()、
curatorFramework.getData()、
curatorFramework.setData()、
curatorFramework.getChildren()、
curatorFramework.sync()、
curatorFramework.getACL()、
usingWatcher()、
watched()。

响应码
响应码用于标识事件的结果状态,所有响应码都被定义在
org.apache.zookeeper.KeeperException.Code类中,比较常见的响应码有OK(0),CONNECTIONLOSS (-4),NODEEXISTS (-110),SESSIONEXPIRED (-112),分别代表调用成功,客户端与服务端连接已断开,指定节点已存在,会话已超时,还有很多错误码可以查阅Code类中的代码。

inBackground的方法参数列表:

public T inBackground();
public T inBackground(Object context);
public T inBackground(BackgroundCallback callback);
public T inBackground(BackgroundCallback callback, Object context);
public T inBackground(BackgroundCallback callback, Executor executor);
public T inBackground(BackgroundCallback callback, Object context, Executor executor);

context参数:传给服务端的参数,会在异步通知中传回来

executor参数:此接口还允许传入一个Executor实例,用一个专门线程池来处理返回结果之后的业务逻辑。

异步创建节点的例子,其他节点类似:

public static void main(String[] args) throws Exception {

    String connectionString = "192.168.58.42:2181";
    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
    CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
    curatorFramework.start();

    //=========================创建节点=============================

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    curatorFramework.create()
            .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
            .withMode(CreateMode.PERSISTENT)//节点类型,持久节点
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("===>响应码:" + event.getResultCode()+",type:" + event.getType());
                    System.out.println("===>Thread of processResult:"+Thread.currentThread().getName());
                    System.out.println("===>context参数回传:" + event.getContext());
                }
            },"传给服务端的内容,异步会传回来", executorService)
            .forPath("/node10","123456".getBytes());
    Thread.sleep(3000);


    curatorFramework.create()
            .creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点
            .withMode(CreateMode.PERSISTENT)//节点类型,持久节点
            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同
            .inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("===>响应码:" + event.getResultCode()+",type:" + event.getType());
                    System.out.println("===>Thread of processResult:"+Thread.currentThread().getName());
                    System.out.println("===>context参数回传:" + event.getContext());
                }
            },"传给服务端的内容,异步会传回来")
            .forPath("/node10","123456".getBytes());
    Thread.sleep(3000);
    executorService.shutdown();
}

运行程序,输出结果如下:

===>响应码:0,type:CREATE
===>Thread of processResult:pool-3-thread-1
===>context参数回传:传给服务端的内容,异步会传回来
===>响应码:-110,type:CREATE
===>Thread of processResult:main-EventThread
===>context参数回传:传给服务端的内容,异步会传回来

上面这个程序使用了异步接口inBackground来创建节点,前后两次调用,创建的节点名相同。从两次返回的event可以看出,第一次返回的响应码是0,表明此次次调用成功,即创建节点成功;而第二次返回的响应码是-110,表明该节点已经存在,无法重复创建。这些响应码和ZooKeeper原生的响应码是一致的。

注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,那么就会使用Zookeeper的EventThread线程对事件进行串行处理.在ZooKeeeper中,所有的异步通知事件都是由EventThread这个线程来处理的——EventThread线程用于穿行处理所有的事件通知。EventThread的“串行处理机制”在绝大部分应用场景下能够保证对事件处理的顺序性,但这个特性也有其弊端,就是一旦碰上一个复杂的处理单元,就会消耗过多的处理时间,从而影响对其他事件得分处理。因此,在上面的inBacigorund接口中,允许用户传入一个Executor实例,这样一来,就可以把那些复杂的事件处理放到一个专门的线程池中去。

事务操作
此外,Curator还支持事务,一组crud操作同生同灭。代码如下:

public class TransactionExamples {

    public static void main(String[] args) throws Exception {
        String connectionString = "192.168.58.42:2181";
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        client.start();

        //开始事务操作
        CuratorOp createParentNode = client.transactionOp().create().forPath("/a", "some data".getBytes());
        CuratorOp createChildNode = client.transactionOp().create().forPath("/a/path", "other data".getBytes());
        CuratorOp setParentNode = client.transactionOp().setData().forPath("/a", "other data".getBytes());
        CuratorOp deleteParent = client.transactionOp().delete().forPath("/a");

        Collection<CuratorTransactionResult>    results = client.transaction().forOperations(createParentNode, createChildNode, setParentNode,deleteParent);

        for ( CuratorTransactionResult result : results ){
            System.out.println(result.getForPath() + " - " + result.getType());
        }
    }
}

上面的例子最后会抛异常,由于最后一步delete的节点不存在,所以整个事务commit失败。

注意:事务操作的时候不支持自动创建父节点,也就是说你想创建的节点如果是多层的,那么父节点一定要存在才可以。