Java NIO案例
2023-09-11 14:21:36 时间
Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码) http://blog.csdn.net/anxpp/article/details/51512200
Java NIO框架Netty简单使用 http://blog.csdn.net/anxpp/article/details/52139155
使用最新Netty实现一个简单的聊天程序 http://blog.csdn.net/anxpp/article/details/52139155
服务端、客户端
package com.dsp.nio; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * * 监控是否可连接、可读、可写 * * 代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler * */ public class Reactor implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); final Selector selector; final ServerSocketChannel serverSocket; /** * 服务端配置初始化,监听端口 * @param port * @throws IOException */ public Reactor(int port) throws IOException { this.selector = Selector.open(); this.serverSocket = ServerSocketChannel.open(); this.serverSocket.socket().bind(new InetSocketAddress(port)); this.serverSocket.configureBlocking(false); SelectionKey selectionKey = this.serverSocket.register(selector, SelectionKey.OP_ACCEPT); // 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor selectionKey.attach(new Acceptor()); log.info("===>>> attach(new Acceptor())"); } /* * SPI */ // Alternatively, use explicit SPI provider // SelectorProvider selectorProvider = SelectorProvider.provider(); // selector = selectorProvider.openSelector(); // serverSocket = selectorProvider.openServerSocketChannel(); /** * 分发请求 * * @param selectionKey */ void dispatch(SelectionKey selectionKey) { Runnable run = (Runnable) (selectionKey.attachment()); if (run != null) { run.run(); } } /** * 监听连接和channel是否就绪 */ public void run() { try { /** * 线程未被中断 */ while (!Thread.interrupted()) { int readySize = this.selector.select(); log.info("I/O ready size = {}", readySize); Set<?> selectedKeys = this.selector.selectedKeys(); Iterator<?> iterator = selectedKeys.iterator(); // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。 while (iterator.hasNext()) { /* * 一个新的连接,第一次出发Accepter线程任务,之后触发Handler线程任务 */ SelectionKey selectionKey = (SelectionKey) iterator.next(); log.info("===>>> acceptable = {}, connectable = {}, readable = {}, writable = {}.",
selectionKey.isAcceptable(), selectionKey.isConnectable(),
selectionKey.isReadable(), selectionKey.isWritable()); dispatch(selectionKey); } selectedKeys.clear(); } } catch (IOException ex) { log.info("reactor stop!" + ex); } } /** * 处理新连接 * * @author dsp * */ class Acceptor implements Runnable { @Override public void run() { try { log.debug("===>>> ready for accept!"); SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { new Handler(selector, socketChannel); } } catch (IOException ex) { /* . . . */ } } } }
package com.dsp.nio; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * * 处理读写 * */ final class Handler implements Runnable { private static Logger log = LoggerFactory.getLogger(Reactor.class); static final int MAX_IN = 1024; static final int MAX_OUT = 1024; ByteBuffer inputBuffer = ByteBuffer.allocate(MAX_IN); ByteBuffer output = ByteBuffer.allocate(MAX_OUT); final SocketChannel socketChannel; final SelectionKey selectionKey; static final int READING = 0, SENDING = 1; int state = READING; /** * 注意在Handler里面又执行了一次attach,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler,从而开始了数据的 * “读 -> 处理 -> 写 -> 发出” 等流程处理。 * * @param selector * @param socketChannel * @throws IOException */ Handler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); this.selectionKey = this.socketChannel.register(selector, 0); this.selectionKey.attach(this); this.selectionKey.interestOps(SelectionKey.OP_READ); // selector.wakeup(); } /** * 只是返回true,具体的判断没有实现 * * @return */ boolean inputIsComplete() { return true; } /** * 只是返回true,具体的判断没有实现 * * @return */ boolean outputIsComplete() { return true; } /** * 处理数据(无具体实现) */ void process(String msg) { // output.put("hello world, hello dsp!".getBytes()); String outMsg = "out + " + msg; output.put(outMsg.getBytes()); output.flip(); } /** * 读取请求数据并处理 * * @throws IOException */ void read() throws IOException { log.info("===>>> read into bytebuffer from socketchannel inputs."); if (inputIsComplete()) { socketChannel.read(inputBuffer); inputBuffer.flip(); byte[] inputBytes = new byte[inputBuffer.limit()]; inputBuffer.get(inputBytes); String inputString = new String(inputBytes); log.info("===>>> 从客户端读取请求信息 = {}", inputString); log.info("===>>> read complete."); process(inputString); state = SENDING; // 读完了数据之后,注册OP_WRITE事件 selectionKey.interestOps(SelectionKey.OP_WRITE); } } /** * 返回响应信息 * * @throws IOException */ void send() throws IOException { log.info("===>>> write into socketchannel from bytebuffer outputs"); socketChannel.write(output); if (outputIsComplete()) { // The key will be removed from all of the selector's key sets during the next // selection operation. selectionKey.cancel(); // 关闭通过,也就关闭了连接 socketChannel.close(); log.info("===>>> close socketchannel after write complete"); } } @Override public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* . . . */ } } }
package com.dsp.nio; import java.io.IOException; /** * * Model: Reactor in SingleThread * * 利用NIO多路复用机制,多路IO复用一个线程 * * @author dsp * */ public class ReactorInSingleThreadServer { public static void main(String args[]) throws IOException { Reactor reactor = new Reactor(9999); reactor.run(); // 不会开启线程,相当于普通方法调用 } }
package com.dsp.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * 访问NIO服务器的客户端 * * @author dsp * */ public class ReactorInSingleThreadClient extends Thread { private static Logger log = LoggerFactory.getLogger(ReactorInSingleThreadClient.class); private static LinkedBlockingQueue<Thread> failureQueue = new LinkedBlockingQueue<Thread>(); @Override public void run() { try { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = SocketChannel.open(); boolean connected = socketChannel.connect(new InetSocketAddress(9999)); if (connected) { log.info("===>>> 和服务器 {} 已连接...", socketChannel.getRemoteAddress()); /* * 请求 */ String msg = "in + 你好,dsp!" + Thread.currentThread().getName(); buffer.put(msg.getBytes()); buffer.flip(); socketChannel.write(buffer); buffer.clear(); /* * 响应 */ buffer.clear(); socketChannel.read(buffer); buffer.flip(); byte[] data = new byte[buffer.limit()]; buffer.get(data); String string = new String(data); log.info("===>>> " + string); buffer.clear(); socketChannel.close(); } else { log.error("连不上服务器..."); } } catch (java.net.ConnectException e) { failureQueue.offer(this); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, InterruptedException { int maxThreads = 3000; while (maxThreads-- > 0) { new Thread(new ReactorInSingleThreadClient()).start(); } Thread.sleep(Integer.MAX_VALUE); } }
^_^
相关文章
- 你不知道的java对象序列化的秘密
- Java 高斯分布随机数
- Java核心技术卷I基础知识3.7.3 文件输入与输出
- Java: TCP 文件上传
- CSDN日报191016:Java纯干货分享:史上最全的JAVA工程师面试题汇总
- 堪称2022最强Java八股文面试题汇总
- 第七节:详细讲解Java中的日期,java.util.date
- Java中toArray的用法探究(java数组与list转换)
- Java多线程 5 多线程其他知识简要介绍
- 《Java程序员面试秘笈》—— 第1章 线程管理 1.1 简介
- java验证码组件kaptcha使用方法
- java 根据实体对象生成 增删改的SQL语句 ModelToSQL
- 在java中使用JMH(Java Microbenchmark Harness)做性能测试
- 《Java学习笔记》:日期类常用方法全归纳,值得收藏。
- JVM(Java虚拟机)优化大全和案例实战
- Java_java动态编译整个项目,解决jar包找不到问题
- Java之throw和throws的区别及java中的异常处理
- 华为OD机试 -滑动窗口最大和(Java) | 机试题+算法思路+考点+代码解析 【2023】
- Java中需要注意的一些案例
- java设计模式案例详解:工厂模式
- Java 实现原型(Prototype)模式
- [java][db]JAVA分布式事务原理及应用
- 2014-5-22 java.lang.OutOfMemoryError: Java heap space的一次诊断
- 【java案例】:模拟物流快递系统程序设计
- 【java养成】:案例(打印三角形,超市购物、随机点名)
- java泛型(泛型接口、泛型类、泛型方法)