zl程序教程

您现在的位置是:首页 >  后端

当前栏目

nio与netty编程(一)

nio编程 Netty
2023-06-13 09:13:58 时间

文章目录

一 多线程编程

线程是比进程更小的能独立运行的基本单位,它是进程的一部分,一个进程可以拥有多个线程,但至少要有一个线程,即主执行线程(Java 的 main 方法)。我们既可以编写单线程应用,也可以编写多线程应用。 一个进程中的多个线程可以并发(同时)执行,在一些执行时间长、需要等待的任务上(例如:文件读写和网络传输等),多线程就比较有用了。 怎么理解多线程呢?来两个例子:

  1. 进程就是一个工厂,一个线程就是工厂中的一条生产线,一个工厂至少有一条生产线,只有一条生产线就是单线程应用,拥有多条生产线就是多线程应用。多条生产线可以同时运行。
  2. 我们使用迅雷可以同时下载多个视频,迅雷就是进程,多个下载任务就是线程,这几个线程可以同时运行去下载视频。

多线程可以共享内存、充分利用 CPU,通过提高资源(内存和 CPU)使用率从而提高程序的执行效率。CPU 使用抢占式调度模式在多个线程间进行着随机的高速的切换。对于 CPU的一个核而言,某个时刻,只能执行一个线程,而 CPU 在多个线程间的切换速度相对我们 的感觉要快很多,看上去就像是多个线程或任务在同时运行。 Java 天生就支持多线程并提供了两种编程方式,一个是继承 Thread 类,一个是实现Runnable 接口

线程安全

产生原因

多个线程操作的是同一个共享资源,但是线程之间是彼此独立、互相隔绝的,因此就会出现数据(共享资源)不能同步更新的情况,这就是线程安全问题

package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: 销售窗口  有线程安全问题
 */
public class SaleWindow implements Runnable{

    private int id = 10;

    /**
     * 卖10张火车票
     */
    public void run() {

        for (int i = 0; i < 10; i++) {
            if (id>0) {
                System.out.println(Thread.currentThread().getName()+"卖了编号为"+id+"的火车票");

                id--;
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  两个线程随机售票 出现线程安全问题
 */
public class TestSaleWindow {

    public static void main(String[] args) {
        SaleWindow saleWindow = new SaleWindow();
        Runnable target;
        Thread t1 = new Thread(saleWindow);
        Thread t2 = new Thread(saleWindow);

        t1.setName("窗口A");
        t2.setName("窗口B");

        t1.start();
        t2.start();
    }
}

解决线程安全问题

Java 中提供了一个同步机制(锁)来解决线程安全问题,即让操作共享数据的代码在某一时间段,只被一个线程执行(锁住),在执行过程中,其他线程不可以参与进来,这样共享数据就能同步了。简单来说,就是给某些代码加把锁。 锁是什么?又从哪儿来呢?锁的专业名称叫监视器 monitor,其实 Java 为每个对象都自动内置了一个锁(监视器 monitor),当某个线程执行到某代码块时就会自动得到这个对象的锁,那么其他线程就无法执行该代码块了,一直要等到之前那个线程停止(释放锁)。需要 特别注意的是:多个线程必须使用同一把锁(对象)。 Java 的同步机制提供了两种实现方式:

  1. 同步代码块:即给代码块上锁,变成同步代码块
  2. 同步方法:即给方法上锁,变成同步方法
synchronized 同步代码块
package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: 销售窗口 同步代码块
 */
public class SaleWindow1 implements Runnable{

    private int id = 10;

    /**
     * 卖10张火车票
     */
    public void run() {

        for (int i = 0; i < 10; i++) {
            synchronized (this) {
                if (id>0) {
                    System.out.println(Thread.currentThread().getName()+"卖了编号为"+id+"的火车票");

                    id--;
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        }
    }
}
package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  两个线程随机售票 使用同步代码块解决线程安全问题
 */
public class TestSaleWindow1 {

    public static void main(String[] args) {
        SaleWindow1 saleWindow1 = new SaleWindow1();
        Runnable target;
        Thread t1 = new Thread(saleWindow1);
        Thread t2 = new Thread(saleWindow1);

        t1.setName("窗口A");
        t2.setName("窗口B");

        t1.start();
        t2.start();
    }
}
synchronized 同步代码块
package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: 销售窗口 使用同步方法 默认使用this作为锁
 */
public class SaleWindow2 implements Runnable{

    private int id = 10;

    /**
     * 卖10张火车票
     */
    public void run() {

        for (int i = 0; i < 10; i++) {
            saleOne();

        }
    }

    private synchronized void saleOne() {
        if (id>0) {
            System.out.println(Thread.currentThread().getName()+"卖了编号为"+id+"的火车票");

            id--;
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.xiepanpan.thread.safe;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  两个线程随机售票 使用同步方法解决线程安全问题
 */
public class TestSaleWindow2 {

    public static void main(String[] args) {
        SaleWindow2 saleWindow2 = new SaleWindow2();
        Runnable target;
        Thread t1 = new Thread(saleWindow2);
        Thread t2 = new Thread(saleWindow2);

        t1.setName("窗口A");
        t2.setName("窗口B");

        t1.start();
        t2.start();
    }
}

通过查看源码,我们发现 StringBuffer 和 Vector 类中的大部分方法都是同步方法,所以证明这两个类在使用时是保证线程安全的;而StringBuilder和ArrayList类中的方法都是普通方法, 没有使用 synchronized 关键字进行修饰,所以证明这两个类在使用时不保证线程安全。线程安全和性能之间不可兼得,保证线程安全就会损失性能,保证性能就不能满足线程安全。

线程间通信

多个线程并发执行时, 在默认情况下 CPU 是随机性的在线程之间进行切换的,但是有时候我们希望它们能有规律的执行, 那么,多线程之间就需要一些协调通信来改变或控制 CPU的随机性。Java 提供了等待唤醒机制来解决这个问题,具体来说就是多个线程依靠一个同步 锁,然后借助于 wait()和 notify()方法就可以实现线程间的协调通信。 同步锁相当于中间人的作用,多个线程必须用同一个同步锁(认识同一个中间人),只有同一个锁上的被等待的线程,才可以被持有该锁的另一个线程唤醒,使用不同锁的线程之间不能相互唤醒,也就无法协调通信。

Java 在 Object 类中提供了一些方法可以用来实现线程间的协调通信,我们一起来了解 一下:

  • public final void wait(); 让当前线程释放锁
  • public final native void wait(long timeout); 让当前线程释放锁,并等待 xx 毫秒
  • public final native void notify(); 唤醒持有同一锁的某个线程
  • public final native void notifyAll(); 唤醒持有同一锁的所有线程

需要注意的是:在调用 wait 和 notify 方法时,当前线程必须已经持有锁,然后才可以调用,否则将会抛出 IllegalMonitorStateException 异常。接下来咱们通过两个案例来演示一下具体如何编程实现线程间通信。

两个线程交替运行

两个线程交替输出信息

为了保证两个线程使用的一定是同一个锁,我们创建一个对象作为静态属性放到一个类中,这个对象就用来充当锁

package com.xiepanpan.thread.communication.num;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  锁对象
 */
public class MyLock {

    public static Object object = new Object();
}

该线程输出十次 1,使用 MyLock.o 作为锁,每输出一个 1 就唤醒另一个线程,然后自己休眠并释放锁。

package com.xiepanpan.thread.communication.num;



/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  输出数字1的线程
 */
public class ThreadForNumOne extends Thread{

    public void run() {
        for (int i = 0; i < 10; i++) {

            synchronized (MyLock.object) {
                System.out.println(1);
                MyLock.object.notify();
                try {
                    MyLock.object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.xiepanpan.thread.communication.num;


/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  输出数字2的线程
 */
public class ThreadForNumTwo extends Thread{

    public void run() {
        for (int i = 0; i < 10; i++) {

            synchronized (MyLock.object) {
                System.out.println(2);
                MyLock.object.notify();
                try {
                    MyLock.object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

该线程输出十次 2,也使用 MyLock.o 作为锁,每输出一个 2 就唤醒另一个线程,然后自己休眠并释放锁。

package com.xiepanpan.thread.communication.num;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  测试两个线程交替执行
 */
public class TestThreadForNum {

    public static void main(String[] args) {
        new ThreadForNumOne().start();
        new ThreadForNumTwo().start();
    }
}

生产者消费者

生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用 List 集合存储数据。该模式的关键之处是如何处理多线程之间的协调通信,内存缓冲区为空的时候,消费者必须等待,而内存缓冲区满的时候,生产者必须等待,其他时候可以是个动态平衡。

这里实现一个生产者一个消费者

定义一个静态集合作为内存缓冲区用来存储数据,同时这个集合也可以作为锁去被多个线程使用。

package com.xiepanpan.thread.communication.producerconsumer;

import java.util.ArrayList;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: 篮子
 */
public class Basket {

    public static ArrayList<String> basket = new ArrayList<String>();
}

生产者不断的往集合(筐)里放水果,当筐满了就停,同时释放锁。

package com.xiepanpan.thread.communication.producerconsumer;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  农民类 相当于生产者
 */
public class Farmer extends Thread{

    public void run() {
        while (true) {
            synchronized (Basket.basket) {
                //篮子满了就不要放了 让农夫休息一哈
                if(Basket.basket.size()==10) {
                    try {
                        Basket.basket.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                //往篮子里放水果
                Basket.basket.add("apple");
                System.out.println("农夫放了一个水果,目前篮子里有"+Basket.basket.size()+"个水果");

                //唤醒小孩继续吃
                Basket.basket.notify();

            }

            //模拟控制速度
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者不断的从集合(筐)里拿水果吃,当筐空了就停,同时释放锁。

package com.xiepanpan.thread.communication.producerconsumer;

import java.util.Random;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: 孩子类 相当于消费者
 */
public class Child extends Thread{

    @Override
    public void run() {
        while (true) {
            synchronized (Basket.basket) {

                //篮子里没有水果 让小孩休息一哈
                if (Basket.basket.size()==0 ) {
                    try {
                        Basket.basket.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                //小孩吃水果
                Basket.basket.remove("apple");
                System.out.println("小孩吃了一个水果 目前篮子里有"+Basket.basket.size()+"个水果");

                //唤醒农夫继续放水果
                Basket.basket.notify();
            }

            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(3000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.xiepanpan.thread.communication.producerconsumer;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description:  测试模拟生产者消费者模式 生产者消费者一对一
 */
public class Test {

    public static void main(String[] args) {
        new Farmer().start();
        new Child().start();
    }

}

二 bio编程

BIO 有的称之为 basic(基本) IO,有的称之为 block(阻塞) IO,主要应用于文件 IO 和网络 IO,这里不再说文件 IO, 大家对此都非常熟悉,本次课程主要讲解网络 IO。 在 JDK1.4 之前,我们建立网络连接的时候只能采用 BIO,需要先在服务端启动一个ServerSocket,然后在客户端启动 Socket 来对服务端进行通信,默认情况下服务端需要对每个请求建立一个线程等待请求,而客户端发送请求后,先咨询服务端是否有线程响应,如果没有则会一直等待或者遭到拒绝,如果有的话,客户端线程会等待请求结束后才继续执行,这就是阻塞式 IO。

服务器端程序,绑定端口号 9999,accept 方法用来监听客户端连接,如果没有客户端连接,就一直等待,程序会阻塞到这里。

package com.xiepanpan.bio;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: bio 服务端程序  先启动
 */
public class TCPServer {

    public static void main(String[] args) throws IOException {

        // 创建ServerSocket 对象
        ServerSocket serverSocket = new ServerSocket(9999);

        while (true) {
            //监听客户端
            System.out.println("启动服务端。。");
            //①这里阻塞
            Socket socket = serverSocket.accept();
            System.out.println("已连接客户端");
            //从连接中取出输入流的接收消息
            //②阻塞
            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[10];
            inputStream.read(bytes);
            String clientIp = serverSocket.getInetAddress().getHostAddress();

            System.out.println(clientIp+"说:"+new String(bytes).trim());

            //从连接中取出输出流并回复
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("收到".getBytes());
            //关闭
            socket.close();

        }
    }



}

客户端程序,通过 9999 端口连接服务器端,getInputStream 方法用来等待服务器端返回数据,如果没有返回,就一直等待,程序会阻塞到这里。

package com.xiepanpan.bio;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author: xiepanpan
 * @Date: 2020/8/3
 * @Description: bio 客户端程序
 */
public class TCPClient {

    public static void main(String[] args) throws IOException {
        while(true) {

            //创建socket对象
            Socket socket = new Socket("127.0.0.1",9999);

            //从连接中取出输出流并发送消息
            OutputStream outputStream = socket.getOutputStream();
            System.out.println("请输入:");
            Scanner scanner = new Scanner(System.in);
            String msg = scanner.nextLine();
            outputStream.write(msg.getBytes());

            //从连接中取出输入流并接收回话
            //①阻塞
            InputStream inputStream = socket.getInputStream();
            byte[] bytes= new byte[20];
            inputStream.read(bytes);
            System.out.println("长安说:"+new String(bytes).trim());
        }
    }
}

三 nio编程

java.nio 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO)。新增了许多用于处理输入输出的类,这些类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写,新增了满足 NIO 的功能。

NIO 和 BIO 有着相同的目的和作用,但是它们的实现方式完全不同,BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多。另外,NIO 是非阻塞式的,这一点跟 BIO 也很不相同,使用它可以提供非阻塞式的高伸缩性网络。

NIO 主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)。

传统的 BIO基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。

文件IO

缓冲区(Buffer):实际上是一个容器,是一个特殊的数组,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如下图所示:

在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,常用的 Buffer 子类有:

  • ByteBuffer,存储字节数据到缓冲区
  • ShortBuffer,存储字符串数据到缓冲区
  • CharBuffer,存储字符数据到缓冲区
  • IntBuffer,存储整数数据到缓冲区
  • LongBuffer,存储长整型数据到缓冲区
  • DoubleBuffer,存储小数到缓冲区
  • FloatBuffer,存储小数到缓冲区

对于 Java 中的基本数据类型,都有一个 Buffer 类型与之相对应,最常用的自然是ByteBuffer 类(二进制数据),该类的主要方法如下所示:

  • public abstract ByteBuffer put(byte[] b); 存储字节数据到缓冲区
  • public abstract byte[] get(); 从缓冲区获得字节数据
  • public final byte[] array(); 把缓冲区数据转换成字节数组
  • public static ByteBuffer allocate(int capacity); 设置缓冲区的初始容量
  • public static ByteBuffer wrap(byte[] array); 把一个现成的数组放到缓冲区中使用
  • public final Buffer flip(); 翻转缓冲区,重置位置到初始位置

通道(Channel):类似于 BIO 中的 stream,例如 FileInputStream 对象,用来建立到目标(文件,网络套接字,硬件设备等)的一个连接,但是需要注意:BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,既可以用来进行读操作,也可以用来进行写操作。

常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。

往本地文件中写数据

/**
 * 往本地文件中写数据
 */
@Test
public void test1() throws IOException {
    //1. 创建文件输出流
    File file;
    FileOutputStream fileOutputStream = new FileOutputStream("test.txt");
    //2. 从流中得到一个通道
    FileChannel channel = fileOutputStream.getChannel();
    //3. 提供一个缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    //4. 往缓冲区中存入数据
    String string = "hello,nio";
    byteBuffer.put(string.getBytes());
    //5. 翻转缓冲区
    byteBuffer.flip();
    //6. 把缓冲区写到通道中
    channel.write(byteBuffer);
    //7. 关闭流
    fileOutputStream.close();
}

NIO 中的通道是从输出流对象里通过 getChannel 方法获取到的,该通道是双向的,既可以读,又可以写。在往通道里写数据之前,必须通过 put 方法把数据存到 ByteBuffer 中,然后通过通道的 write 方法写数据。在 write 之前,需要调用 flip 方法翻转缓冲区,把内部重置到初始位置,这样在接下来写数据时才能把所有数据写到通道里。

从本地文件中读数据

/**
 * 从本地文件中读取数据
 * @throws FileNotFoundException
 */
@Test
public void test2() throws IOException {
    //1. 创建输入流
    File file = new File("test.txt");
    FileInputStream fileInputStream = new FileInputStream(file);
    //2. 得到一个通道
    FileChannel channel = fileInputStream.getChannel();
    //3. 准备一个缓冲区
    ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
    //4. 从通道里读取数据并保存到缓冲区中
    channel.read(byteBuffer);
    System.out.println(new String(byteBuffer.array()));
    //5. 关闭
    fileInputStream.close();
}

上述代码从输入流中获得一个通道,然后提供 ByteBuffer 缓冲区,该缓冲区的初始容量和文件的大小一样,最后通过通道的 read 方法把数据读取出来并存储到了 ByteBuffer 中。

复制文件

/**
 * 使用NIO实现文件复制
 * @throws IOException
 */
@Test
public void test3() throws IOException {
    //1. 创建两个流
    FileInputStream fileInputStream = new FileInputStream("test.txt");
    FileOutputStream fileOutputStream = new FileOutputStream("D:\\test.txt");

    //2. 得到两个通道
    FileChannel sourceFileChannel = fileInputStream.getChannel();
    FileChannel destFileChannel = fileOutputStream.getChannel();

    //3. 复制
    destFileChannel.transferFrom(sourceFileChannel,0,sourceFileChannel.size());

    //4. 关闭
    fileInputStream.close();
    fileOutputStream.close();
}

通过 BIO 复制一个视频文件:

	@Test
    public void test3() throws Exception{
        FileInputStream fis=new FileInputStream("C:\\Users\\zdx\\Desktop\\oracle.mov");
        FileOutputStream fos=new FileOutputStream("d:\\oracle.mov");
        byte[] b=new byte[1024];
        while (true) {
            int res=fis.read(b);
            if(res==-1){
                break;
            }
            fos.write(b,0,res);
        }
        fis.close();
        fos.close();
    }

上述代码分别通过输入流和输出流实现了文件的复制,这是通过传统的 BIO 实现的

通过 NIO 复制相同的视频文件,代码如下所示:

	@Test
    public void test4() throws Exception{
        FileInputStream fis=new FileInputStream("C:\\Users\\zdx\\Desktop\\oracle.mov");
        FileOutputStream fos=new FileOutputStream("d:\\oracle.mov");
        FileChannel sourceCh = fis.getChannel();
        FileChannel destCh = fos.getChannel();
        destCh.transferFrom(sourceCh, 0, sourceCh.size());
        sourceCh.close();
        destCh.close();
    }

上述代码分别从两个流中得到两个通道,sourceCh 负责读数据,destCh 负责写数据,然后直接调用 transferFrom 方法一步到位实现了文件复制。

网络IO

文件 IO 时用到的 FileChannel 并不支持非阻塞操作,学习 NIO 主要就是进行网络 IO,Java NIO 中的网络通道是非阻塞 IO 的实现,基于事件驱动,非常适用于服务器需要维持大量连接,但是数据交换量不大的情况,例如一些即时通信的服务等等… 在 Java 中编写 Socket 服务器,通常有以下几种模式:

  • 一个客户端连接用一个线程,优点:程序编写简单;缺点:如果连接非常多,分配的线程也会非常多,服务器可能会因为资源耗尽而崩溃。
  • 把每一个客户端连接交给一个拥有固定数量线程的连接池,优点:程序编写相对简单,可以处理大量的连接。确定:线程的开销非常大,连接如果非常多,排队现象会比较严重。
  • 使用 Java 的 NIO,用非阻塞的 IO 方式处理。这种模式可以用一个线程,处理大量的客户端连接。
  1. Selector(选择器),能够检测多个注册的通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接。这样使得只有在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。

该类的常用方法如下所示:

  • public static Selector open(),得到一个选择器对象
  • public int select(long timeout),监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
  • public Set<SelectionKey> selectedKeys(),从内部集合中得到所有的 SelectionKey
  1. SelectionKey,代表了 Selector 和网络通道的注册关系,一共四种:
  • int OP_ACCEPT:有新的网络连接可以 accept,值为 16
  • int OP_CONNECT:代表连接已经建立,值为 8
  • int OP_READ 和 int OP_WRITE:代表了读、写操作,值为 1 和 4

该类的常用方法如下所示:

  • public abstract Selector selector(),得到与之关联的 Selector 对象
  • public abstract SelectableChannel channel(),得到与之关联的通道
  • public final Object attachment(),得到与之关联的共享数据
  • public abstract SelectionKey interestOps(int ops),设置或改变监听事件
  • public final boolean isAcceptable(),是否可以 accept
  • public final boolean isReadable(),是否可以读
  • public final boolean isWritable(),是否可以写
  1. ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示:
  • public static ServerSocketChannel open(),得到一个 ServerSocketChannel 通道
  • public final ServerSocketChannel bind(SocketAddress local),设置服务器端端口号
  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
  • public SocketChannel accept(),接受一个连接,返回代表这个连接的通道对象
  • public final SelectionKey register(Selector sel, int ops),注册一个选择器并设置监听事件
  1. SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 总是把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。常用方法如下所示:
  • public static SocketChannel open(),得到一个 SocketChannel 通道
  • public final SelectableChannel configureBlocking(boolean block),设置阻塞或非阻塞模式, 取值 false 表示采用非阻塞模式
  • public boolean connect(SocketAddress remote),连接服务器
  • public boolean finishConnect(),如果上面的方法连接失败,接下来就要通过该方法完成 连接操作
  • public int write(ByteBuffer src),往通道里写数据
  • public int read(ByteBuffer dst),从通道里读数据
  • public final SelectionKey register(Selector sel, int ops, Object att),注册一个选择器并设置 监听事件,最后一个参数可以设置共享数据
  • public final void close(),关闭通道

nio网路通信demo

package com.xiepanpan.nio.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * @author: xiepanpan
 * @Date: 2020/8/4
 * @Description: nio实现网络通信 服务端
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        //1、 得到一个ServerSocketChannel  老大
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //2、 得到一个Selector对象 间谍
        Selector selector = Selector.open();
        //3. 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //4、设置非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //5. 把ServerSocketChannel对象注册给Selector对象
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //6. 干活
        while (true) {
            //监控客户端
            if (selector.select(2000)==0) {
                System.out.println("server: 没有客户端搭理我 我干别的事");
                continue;
            }
            //得到SelectionKey 判断通道里的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()){
                    //客户端连接请求事件
                    System.out.println("OP_ACCEPT");
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                }

                //读取客户端数据事件
                if (selectionKey.isReadable()) {
                    SocketChannel channel = (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
                    channel.read(byteBuffer);
                    System.out.println("客户端发来数据:"+new String(byteBuffer.array()));
                }

                //手动从集合中移除当前key 防止重复处理
                iterator.remove();
            }
        }
    }
}
package com.xiepanpan.nio.socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author: xiepanpan
 * @Date: 2020/8/4
 * @Description:  nio实现网络通信  客户端
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        //1. 得到一个网络通道
        SocketChannel socketChannel = SocketChannel.open();
        //2. 设置非阻塞方式
        socketChannel.configureBlocking(false);
        //3、 提供服务端的ip地址和端口号
        String host;
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 9999);
        //4. 连接服务器端
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("Client:连接服务器的同时 我还可以干别的事情");
            }
        }
        //5. 得到一个缓冲区并存入数据
        String msg = "hello server";
        ByteBuffer wrap = ByteBuffer.wrap(msg.getBytes());
        //6、发送数据
        socketChannel.write(wrap);
        System.in.read();
    }
}

多人网络聊天

聊天程序的服务器端,可以接受客户端发来的数据,并能把数据广播给所有客户端

package com.xiepanpan.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  nio实现多人聊天 服务端
 */
public class ChatServer {

    //监听通道 (老大)
    private  ServerSocketChannel serverSocketChannel;

    //选择器对象 (间谍)
    private Selector selector;

    private static final int PORT =9999;

    public ChatServer() throws IOException {
        //得到监听通道
        serverSocketChannel = ServerSocketChannel.open();

        selector = Selector.open();

        serverSocketChannel.bind(new InetSocketAddress(PORT));

        serverSocketChannel.configureBlocking(false);

        //将选择器绑定到监听通道并监听accept事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        printInfo("chat server is ready...");
    }

    public static void main(String[] args) throws IOException {
        new ChatServer().start();
    }

    /**
     * 往控制台打印消息
     * @param string
     */
    private void printInfo(String string) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("["+simpleDateFormat.format(new Date())+"]"+ string);
    }



    private void start() throws IOException {
        while (true) {
            if (selector.select(2000) == 0) {
                System.out.println("server: 没有客户端找我 我就干别的事情");
                continue;
            }

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                    System.out.println(socketChannel.getRemoteAddress().toString().substring(1)+"上线了。。。");
                }

                //读数据
                if (selectionKey.isReadable()) {
                    readMsg(selectionKey);
                }

                //
                iterator.remove();
            }
        }
    }

    private void readMsg(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int count = socketChannel.read(byteBuffer);
        if (count>0) {
            String msg = new String(byteBuffer.array());
            printInfo(msg);

            //广播消息
            broadCast(socketChannel,msg);
        }
    }

    /**
     * 向所有客户端发送广播
     * @param socketChannel
     * @param msg
     */
    private void broadCast(SocketChannel socketChannel, String msg) throws IOException {
        System.out.println("服务器发送了广播");
        for (SelectionKey selectionKey:selector.keys()) {
            Channel targetChannel = selectionKey.channel();
            //是socketChannel 但不是自身  消息不广播给自己
            if (targetChannel instanceof SocketChannel && targetChannel!=socketChannel) {
                SocketChannel destChannel = (SocketChannel) targetChannel;
                ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
                destChannel.write(byteBuffer);
            }
        }
    }
}

聊天程序的客户端,可以向服务器端发送数据,并能接收服务器广播的数据。

package com.xiepanpan.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  nio实现多人聊天 客户端
 */
public class ChatClient {

    //服务器地址
    private final String HOST = "127.0.0.1";
    private int PORT =9999;
    //网络通道
    private SocketChannel socketChannel;
    //用户名
    private String username;

    public ChatClient() throws IOException {
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(HOST, PORT);
        if (!socketChannel.connect(inetSocketAddress)) {
            while (!socketChannel.finishConnect()) {
                System.out.println("Client:连接服务器端的同时 我还可以干别的事情");
            }
        }

        username = socketChannel.getLocalAddress().toString().substring(1);

        System.out.println("=======client"+username+" is ready========");
    }

    /**
     * 向服务端发送数据
     * @param msg
     */
    public void sendMsg(String msg) throws IOException {

        //发送 bye表示聊天结束
        if (msg.equalsIgnoreCase("bye")) {
            socketChannel.close();
            return;
        }

        msg = username+"说:"+msg;
        ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
        socketChannel.write(byteBuffer);

    }

    /**
     * 从服务器端接收数据
     */
    public void receiveMsg() throws IOException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int size = socketChannel.read(byteBuffer);
        if (size>0) {
            String msg = new String(byteBuffer.array());
            System.out.println(msg.trim());
        }
    }
}
package com.xiepanpan.nio.chat;

import java.io.IOException;
import java.util.Scanner;

/**
 * @author: xiepanpan
 * @Date: 2020/8/5
 * @Description:  多人聊天测试类
 */
public class TestChat {

    public static void main(String[] args) throws IOException {
        final ChatClient chatClient = new ChatClient();

        new Thread(){
            @Override
            public void run() {
                while (true) {
                    try {
                        chatClient.receiveMsg();
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            chatClient.sendMsg(msg);
        }
    }


}

配置这里可以启动多个客户端

AIO

JDK 7 引入了 Asynchronous I/O,即 AIO。在进行 I/O 编程中,常用到两种模式:Reactor和 Proactor。Java 的 NIO 就是 Reactor,当有事件触发时,服务器端得到通知,进行相应的处理。 AIO 即 NIO2.0,叫做异步不阻塞的 IO。AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,一个有效的请求才启动一个线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。

IO 对比总结

IO 的方式通常分为几种:同步阻塞的 BIO、同步非阻塞的 NIO、异步非阻塞的 AIO。

  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序直观简单易理解。
  • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4 开始支持。
  • AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

举个例子再理解一下:

  • 同步阻塞:你到饭馆点餐,然后在那等着,啥都干不了,饭馆没做好,你就必须等着!
  • 同步非阻塞:你在饭馆点完餐,就去玩儿了。不过玩一会儿,就回饭馆问一声:好了没啊!
  • 异步非阻塞:饭馆打电话说,我们知道您的位置,一会给你送过来,安心玩儿就可以了,类似于现在的外卖。

对比总结

BIO

NIO

AIO

IO方式

同步阻塞

同步非阻塞(多路复用)

异步非阻塞

API使用难度

简单

复杂

复杂

可靠性

吞吐量