zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【Netty】Netty 异步任务模型 及 Future-Listener 机制

异步 机制 Netty listener Future
2023-06-13 09:17:42 时间

文章目录

一、 Netty 模型


以服务器端为例

1 . 线程池 : Netty 模型核心就是两个线程池 , BossGroup 线程池 和 WorkerGroup 线程池 ;

① BossGroup 线程池 : 负责维护客户端连接操作 ;

② WorkerGroup 线程池 : 负责与客户端进行数据交互 ;

③ 线程池类型 : 上述两个线程池 ( BossGroup / WorkerGroup ) 都是 NioEventLoopGroup 类型的 ;

④ 线程池中的线程 : NioEventLoopGroup 线程池中维护了多个 NioEventLoop 线程 ;

2 . 线程池中的线程 : NioEventLoopGroup 线程池中维护了若干 NioEventLoop 线程 , 这相当于主从反应器 ( Reactor ) 模型中的反应器 , 每个 NioEventLoop 中都有一个 选择器 ( Selector ) , 用于监听 Socket IO 事件 , 如 建立连接 , 数据读写 等 ;

3 . NioEventLoop 工作流程 :

NioEventLoop 中可以按照一定顺序进行数据处理 , 如数据到来后 , 按照下面的流程执行一系列操作 ;

读取数据 -> 数据解码 -> 业务逻辑处理 -> 数据编码 -> 数据发送

4 . NioEventLoop 中封装内容 :

  • 选择器 Selector
  • 任务队列 TaskQueue
  • 调度任务队列 ScheduleTaskQueue
  • NIO 通道 NioChannel
  • 管道 ChannelPipeline

上面是 Netty 的模型的总体架构 , 下面重点介绍 Netty 模型中的异步模型 , Netty 中的每次绑定端口 , 连接远程端口 , 读写数据都要涉及到异步操作 ;

二、 异步模型


1 . 异步操作概念 : 调用者调用一个异步操作后 , 并不能马上知道该操作的返回值 , 该操作也不会马上执行完成 , 该操作完成后 , 会通过回调机制 , 如 通知 , 注册的回调函数等机制通知调用者 ;

2 . Netty 中的异步操作与 ChannelFuture 返回值 :

① 异步操作 : Netty 模型中凡是关于 IO 的操作 , 如绑定端口 ( Bind ) , 远程连接 ( Connect ) , 读取数据 ( Read ) , 写出数据 ( Write ) 等操作都是异步操作 ;

② 异步操作返回值 : 上述 IO 操作返回值都是 ChannelFuture 类型实例 , ChannelFuture 是异步 IO 操作的返回结果 ;

③ 在服务器端绑定端口号时 , 调用 Bootstrap 的 bind 方法 , 会返回 ChannelFuture 对象 ;

④ 在客户端调用 Bootstrap 的 connect 方法 , 也会返回 ChannelFuture 对象 ;

3 . Netty 中的异步操作机制 :

① Future-Listener 机制 : Future 表示当前不知道结果 , 在未来的某个时刻才知道结果 , Listener 表示监听操作 , 监听返回的结果 ;

② Netty 异步模型的两个基础 : Future ( ChannelFuture 未来知道结果 ) , Callback ( 监听回调 ) ;

4 . 以客户端写出数据到服务器端为例 :

客户端写出数据 : 客户端调用写出数据方法 ChannelFuture writeAndFlush(Object msg) , 向服务器写出数据 ;

操作耗时 : 假设在服务器中接收到该数据后 , 要执行一个非常耗时的操作才能返回结果 , 就是操作非常耗时 ;

客户端不等待 : 客户端这里写出了数据 , 肯定不能阻塞等待写出操作的结果 , 需要立刻执行下面的操作 , 因此该方法是异步的 ;

客户端监听 : writeAndFlush 方法返回一个 ChannelFuture 对象 , 如果客户端需要该操作的返回结果 , 那么通过 ChannelFuture 可以监听该写出方法是否成功 ;

5 . 异步操作返回结果 :

① 返回结果 : Future 表示异步 IO 操作执行结果 , 通过该 Future 提供的 检索 , 计算 等方法检查异步操作是否执行完成 ;

② 常用接口 : ChannelFuture 继承了 Future , 也是一个接口 , 可以为该接口对象注册监听器 , 当异步任务完成后会回调该监听器方法 ;

public interface ChannelFuture extends Future<Void>

6 . Future 链式操作 : 这里以读取数据 , 处理后返回结果为例 ;

  • 数据读取操作 ;
  • 对读取的数据进行解码处理 ;
  • 执行业务逻辑
  • 将数据编码 ;
  • 将编码后的数据写出 ;

上述

5

个步骤 , 每个数据处理操作 , 都有与之对应的 Handler 处理器 ;

异步机制 : 在 Handler 处理器中需要实现异步机制 , 一般使用 Callback 回调 , 或 Future 机制 ;

链式操作优势 : 上述的链式操作 , 简洁 , 高效 , 可以让开发者快速开发高性能 , 高可靠性服务器 , 只关注业务逻辑 , 不用过多的将精力浪费在网络基础功能开发上 ;

这里的网络基础功能就是高可靠性 , 高性能的网络传输模块 ;

三、 Future-Listener 机制


1 . Future-Listener 机制 :

① Future 返回值 : 在 Netty 中执行 IO 操作 , 如 bind , read , write , connect 等方法 , 会立刻返回 ChannelFuture 对象 ;

② ChannelFuture 返回时状态 : 调用 IO 方法后 , 立刻返回 ChannelFuture 对象 , 此时该操作未完成 ;

③ 注册监听器 : ChannelFuture 可以设置 ChannelFutureListener 监听器 , 监听该 IO 操作完成状态 , 如果 IO 操作完成 , 那么会回调其 public void operationComplete(ChannelFuture future) throws Exception 接口实现方法 ;

④ IO 操作执行状态判定 : 在 operationComplete 方法中通过 调用 ChannelFuture future 参数的如下方法 , 判定当前 IO 操作完成状态 ;

  • future.isDone() : IO 操作是否完成 ;
  • future.isSuccess() : IO 操作是否成功 ; ( 常用 )
  • future.isCancelled() : IO 操作是否被取消 ;
  • future.cause() : IO 操作的失败原因 ;

2 . IO 操作的同步与异步 :

① 同步 IO 操作 : BIO 中的同步 IO 操作 , 会阻塞当前的线程 , IO 操作返回前 , 处于阻塞状态 , 不能执行其它操作 ;

② 异步 IO 操作 : 异步 IO 操作不会阻塞当前的线程 , 调用 IO 操作之后 , 可以立即执行其它操作 , 不会阻塞当前线程 , 该机制非常适用于高并发的场景 , 开发稳定 , 并发 , 高吞吐量 的服务器 ;

3 . 核心代码示例 :

// 监听绑定操作的结果
// 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if(future.isDone()){
            System.out.println("绑定端口完成");
        }
        if(future.isSuccess()){
            System.out.println("绑定端口成功");
        }else{
            System.out.println("绑定端口失败");
        }
        if(future.isCancelled()){
            System.out.println("绑定端口取消");
        }
        System.out.println("失败原因 : " + future.cause());
    }
});

四、 Future-Listener 机制代码示例


1 . 代码示例 : 这里以服务器程序为例 , 客户端程序就不贴了 ;

package kim.hsl.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Netty 案例服务器端
 */
public class Server {
    public static void main(String[] args) {

        // 1. 创建 BossGroup 线程池 和 WorkerGroup 线程池, 其中维护 NioEventLoop 线程
        //     NioEventLoop 线程中执行无限循环操作

        // BossGroup 线程池 : 负责客户端的连接
        // 指定线程个数 : 客户端个数很少, 不用很多线程维护, 这里指定线程池中线程个数为 1
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // WorkerGroup 线程池 : 负责客户端连接的数据读写
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 2. 服务器启动对象, 需要为该对象配置各种参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup) // 设置 主从 线程组 , 分别对应 主 Reactor 和 从 Reactor
                .channel(NioServerSocketChannel.class)  // 设置 NIO 网络套接字通道类型
                .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列维护的连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置连接状态行为, 保持连接状态
                .childHandler(  // 为 WorkerGroup 线程池对应的 NioEventLoop 设置对应的事件 处理器 Handler
                        new ChannelInitializer<SocketChannel>() {// 创建通道初始化对象
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                // 该方法在服务器与客户端连接建立成功后会回调
                                // 为 管道 Pipeline 设置处理器 Hanedler
                                // 这里暂时设置为 null , 执行不会失败 , 服务器绑定端口会成功
                                ch.pipeline().addLast(null);
                            }
                        }
                );
        System.out.println("服务器准备完毕 ...");

        ChannelFuture channelFuture = null;
        try {
            // 绑定本地端口, 进行同步操作 , 并返回 ChannelFuture
            channelFuture = bootstrap.bind(8888).sync();
            System.out.println("服务器开始监听 8888 端口 ...");
			
			// ( 本次示例核心代码 ) ----------------------------------------------------------
            // 监听绑定操作的结果 ( 本次示例核心代码 )
            // 添加 ChannelFutureListener 监听器, 监听 bind 操作的结果
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if(future.isDone()){
                        System.out.println("绑定端口完成");
                    }

                    if(future.isSuccess()){
                        System.out.println("绑定端口成功");
                    }else{
                        System.out.println("绑定端口失败");
                    }

                    if(future.isCancelled()){
                        System.out.println("绑定端口取消");
                    }

                    System.out.println("失败原因 : " + future.cause());
                }
            });
            // ( 本次示例核心代码 ) ----------------------------------------------------------


            // 关闭通道 , 开始监听操作
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 出现异常后, 优雅的关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

2 . 执行结果 : 执行上述服务器 , 由此可见 绑定 bind 操作执行完成 , 并且执行成功 , 没有失败 , 因此失败原因为 null ;