【转载】example for NIO
nio for 转载 Example
2023-09-14 08:57:28 时间
NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.
为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.
TestMain.java:引导类 ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄. Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性. ServerControllor.java:server端连接处理类,负责接收连接和数据处理 ServerHandler.java:server端连接维护类.TestMain.java:
public static void main(String[] args) throws Exception{ int port = 30008; ServerControllor sc = new ServerControllor(port); sc.start(); Thread.sleep(2000); ClientControllor cc = new ClientControllor("127.0.0.1", port); cc.start(); Packet p1 = Packet.wrap("Hello,I am first!"); cc.put(p1); Packet p2 = Packet.wrap("Hello,I am second!"); cc.put(p2); Packet p3 = Packet.wrap("Hello,I am thread!"); cc.put(p3); }
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.zip.Adler32; import java.util.zip.Checksum; public class ClientControllor { private BlockingQueue Packet inner = new LinkedBlockingQueue Packet (100);//no any more private Object lock = new Object(); private InetSocketAddress remote; private Thread thread = new ClientThread(remote); public ClientControllor(String host,int port){ remote = new InetSocketAddress(host, port); } public void start(){ if(thread.isAlive() || remote == null){ return; } synchronized (lock) { thread.start(); } } public boolean put(Packet packet){ return inner.offer(packet); } public void clear(){ inner.clear(); } class ClientThread extends Thread { SocketAddress remote; SocketChannel channel; ClientThread(SocketAddress remote){ this.remote = remote; } @Override public void run(){ try{ try{ channel = SocketChannel.open(); channel.configureBlocking(true); boolean isSuccess = channel.connect(new InetSocketAddress(30008)); if(!isSuccess){ while(!channel.finishConnect()){ System.out.println("Client is connecting..."); } } System.out.println("Client is connected."); // Selector selector = Selector.open(); // channel.register(selector, SelectionKey.OP_WRITE); // while(selector.isOpen()){ // selector.select(); // Iterator SelectionKey it = selector.selectedKeys().iterator(); // while(it.hasNext()){ // SelectionKey key = it.next(); // it.remove(); // if(!key.isValid()){ // continue; // } // if(key.isWritable()){ // write(); // } // } // } while(channel.isOpen()){ write(); } }catch(Exception e){ e.printStackTrace(); }finally{ if(channel != null){ try{ channel.close(); }catch(Exception ex){ ex.printStackTrace(); } } } }catch(Exception e){ e.printStackTrace(); inner.clear(); } } private void write() throws Exception{ Packet packet = inner.take(); synchronized (lock) { ByteBuffer body = packet.getBuffer();// ByteBuffer head = ByteBuffer.allocate(4); head.putInt(body.limit()); head.flip(); while(head.hasRemaining()){ channel.write(head); } Checksum checksum = new Adler32(); while(body.hasRemaining()){ checksum.update(body.get()); } body.rewind(); while(body.hasRemaining()){ channel.write(body); } long cks = checksum.getValue(); ByteBuffer tail = ByteBuffer.allocate(8); tail.putLong(cks); tail.flip(); while(tail.hasRemaining()){ channel.write(tail); } } } }
public String getDataAsString(){ return charset.decode(buffer).toString(); } public byte[] getData(){ return buffer.array(); } public ByteBuffer getBuffer(){ return this.buffer; } public static Packet wrap(ByteBuffer buffer){ return new Packet(buffer); } public static Packet wrap(String data){ ByteBuffer source = charset.encode(data); return new Packet(source); }
import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class ServerControllor { private int port; private Thread thread = new ServerThread();; private Object lock = new Object(); public ServerControllor(){ this(0); } public ServerControllor(int port){ this.port = port; } public void start(){ if(thread.isAlive()){ return; } synchronized (lock) { thread.start(); System.out.println("Server starting...."); } } class ServerThread extends Thread { private static final int TIMEOUT = 3000; private ServerHandler handler = new ServerHandler(); @Override public void run(){ try{ ServerSocketChannel channel = null; try{ channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.socket().setReuseAddress(true); channel.socket().bind(new InetSocketAddress(port)); Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); while(selector.isOpen()){ System.out.println("Server is running,port:" + channel.socket().getLocalPort()); if(selector.select(TIMEOUT) == 0){ continue; } Iterator SelectionKey it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = it.next(); it.remove(); if(!key.isValid()){ continue; } if(key.isAcceptable()){ accept(key); }else if(key.isReadable()){ read(key); } } } }catch(Exception e){ e.printStackTrace(); }finally{ if(channel != null){ try{ channel.close(); }catch(Exception ex){ ex.printStackTrace(); } } } }catch(Exception e){ e.printStackTrace(); } } private void accept(SelectionKey key) throws Exception{ SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept(); socketChannel.configureBlocking(true); //socketChannel.register(key.selector(), SelectionKey.OP_READ); handler.handle(socketChannel); } private void read(SelectionKey key) throws Exception{ SocketChannel channel = (SocketChannel)key.channel(); //handler.handle(channel); } }
private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1); private static Map SocketChannel,Thread holder = new HashMap SocketChannel,Thread (32); @Override public void handle(SocketChannel channel) { synchronized (holder) { if(holder.containsKey(channel)){ return; } Thread t = new ReadThread(channel); holder.put(channel, t); t.start(); } } static class ReadThread extends Thread{ SocketChannel channel; ReadThread(SocketChannel channel){ this.channel = channel; } @Override public void run(){ try{ semaphore.acquire(); boolean eof = false; while(channel.isOpen()){ //ByteBuffer byteBuffer = new ByteBuffer(1024); ByteBuffer head = ByteBuffer.allocate(4);//int for data-size while(true){ int cb = channel.read(head); if(cb == -1){ throw new RuntimeException("EOF error,data lost!"); } if(isFull(head)){ break; } } head.flip(); int dataSize = head.getInt(); if(dataSize = 0){ throw new RuntimeException("Data format error,something lost???"); } ByteBuffer body = ByteBuffer.allocate(dataSize); while(true){ int cb = channel.read(body); if(cb == -1){ throw new RuntimeException("EOF error,data lost!"); }else if(cb == 0 this.isFull(body)){ break; } } ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size while(true){ int cb = channel.read(tail); if(cb == -1){ eof = true; } if(isFull(tail)){ break; } } tail.flip(); long sck = tail.getLong(); Checksum checksum = new Adler32(); checksum.update(body.array(), 0, dataSize); long cck = checksum.getValue(); if(sck != cck){ throw new RuntimeException("Sorry,some data lost or be modified,please check!"); } body.flip(); Packet packet = Packet.wrap(body); System.out.println(packet.getDataAsString()); if(eof){ break; } } }catch(Exception e){ e.printStackTrace(); }finally{ if(channel != null){ try{ channel.close(); }catch(Exception ex){ ex.printStackTrace(); } } holder.remove(channel); semaphore.release(); } } private boolean isFull(ByteBuffer byteBuffer){ return byteBuffer.position() == byteBuffer.capacity() ? true : false; } }
NIO学习一 NIO相比普通IO提供了功能更为强大、处理数据更快的解决方案。 常用于高性能服务器上。NIO实现高性能处理的原理是使用较少的线程来处理更多的任务 常规io使用的byte[]、char[]进行封装,而NIO采用ByteBuffer类来操作数据,再结合 针对File或socket技术的channel,采用同步非阻塞技术来实现高性能处理,而Netty 正是采用ByteBuffer(缓冲区)、Channel(通道)、Selector(选择器)进行封装的。 因此我们需要先了解NIO相关的知识。
NIO 所谓NIO,就是New IO的缩写。是从JDK 1.4开始引入的全新的IO API。NIO将以更高效的方式进行文件的读写操作,可完全代替传统的IO API使用。而且JDK 1.7对NIO又进行了更新,可以称作NIO 2.0。
netty和nio netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。 Channel channel封装了对socket的原子操作,实质是对socket的封装和扩展。
bio和nio 操作系统为了限制程序的数据访问,来防止获取其他进程或外围设备数据,将CPU划分为用户态和内核态: 内核态(Kernel Mode):cpu可以访问内存的所有数据,包括外围设备,例如硬盘,网卡,cpu也可以将自己从一个程序切换到另一个程序。
在进入NIO之前,先回顾一下Java标准IO方式实现的网络server端: public class IOServerThreadPool { private static final Logger LOGGER = LoggerFactory.
使用netty的NIO来实现一个简单的TimeServer 只想以此来加深java的NIO这方面的知识点~~~ 参考书籍《netty权威指南》(第二版) 这本书,第一二三章,前面讲java的bio,nio,nio2, 讲得蛮好的。
nio之netty5应用 1、netty5和netty4的区别不是很大,但是与netty3差别还是有的。这里不介绍netty4,因为和netty5的方式都差不多。netty5的复杂性相对于netty3要多很多了。基本上架构都被重构了。
相关文章
- java nio_(一) Java NIO 概述[通俗易懂]
- java nio 详_java NIO 详解
- 为什么有的人学完Netty 都还不知道BIO|NIO|AIO 的区别?
- 【Netty】NIO 缓冲区 ( Buffer ) 分散 Scattering 与 聚合 Gathering 操作
- ORA-14764: FOR VALUES clause cannot be specified for only one partition ORACLE 报错 故障修复 远程处理
- Java 对象序列化 NIO NIO2 深度解析详解编程语言
- 通过Java NIO 实现文件下载详解编程语言
- Java NIO Channel通道详解编程语言
- 循环Linux中使用For循环的正确方式(linux里的for)
- 目录用For循环创建Linux目录的简单技巧(for循环创建linux)
- MySQL存储过程中使用FOR循环的实现(mysql存储过程for循环)
- 技巧使用Oracle的FOR循环加快编程效率(oracle的for循环)
- Linux下如何优雅地使用For循环(linux下for循环)
- 数据库使用NIO连接Oracle数据库轻松尝试(nio连接oracle)
- 循环Oracle环境下使用For循环的指南(oracle中使用for)
- 遍历Oracle中用For反向遍历字符串的简单示例(oracle中for反向)
- 与Oracle中的FOR语句实现数据删除(oracle中for删除)
- java的nio的使用示例分享