zl程序教程

您现在的位置是:首页 >  后端

当前栏目

【转载】example for NIO

nio for 转载 Example
2023-09-14 08:57:28 时间

 

NIO-Socket通讯,为我们解决了server端多线程设计方面的性能/吞吐量等多方面的问题,它提供了以非阻塞模式 + 线程池的方式来解决Server端高并发问题..NIO并不能显著的提升Client-server的通讯性能(其中包括全局性耗时总和,Server物理机资源开销和实际计算量),但是它可以确保Server端在支撑相应的并发量情况下,对物理资源的使用处于可控状态.对于开发者而言,NIO合理的使用了平台(OS/VM/Http协议)的特性并提供了高效的便捷的编程级别的API.

 

为了展示,NIO交互的基本特性,我们模拟了一个简单的场景:Client端向server端建立连接,并持续交付大量数据,Server负载client的数据传输和处理.此程序实例并没有太多的关注异常处理和业务性处理,也没有使用线程池作为server端socket句柄管理,不过你可以简单的修改代码也实现它.

TestMain.java:引导类 ClientControllor.java:client连接处理类,负责队列化数据提交,并负责维护socket句柄. Packet.java:对于读取或者写入的buffer,进行二次封装,使其具有更好的可读性. ServerControllor.java:server端连接处理类,负责接收连接和数据处理 ServerHandler.java:server端连接维护类.

TestMain.java:

 


    public static void main(String[] args) throws Exception{           int port = 30008;           ServerControllor sc = new ServerControllor(port);           sc.start();           Thread.sleep(2000);           ClientControllor cc = new ClientControllor("127.0.0.1", port);           cc.start();           Packet p1 = Packet.wrap("Hello,I am first!");           cc.put(p1);           Packet p2 = Packet.wrap("Hello,I am second!");           cc.put(p2);           Packet p3 = Packet.wrap("Hello,I am thread!");           cc.put(p3);       }  
import java.util.concurrent.BlockingQueue;   import java.util.concurrent.LinkedBlockingQueue;   import java.util.zip.Adler32;   import java.util.zip.Checksum;   public class ClientControllor {       private BlockingQueue Packet  inner = new LinkedBlockingQueue Packet (100);//no any more       private Object lock = new Object();       private InetSocketAddress remote;       private Thread thread = new ClientThread(remote);       public ClientControllor(String host,int port){           remote = new InetSocketAddress(host, port);       }              public void start(){           if(thread.isAlive() || remote == null){               return;           }           synchronized (lock) {               thread.start();           }                                 }       public boolean put(Packet packet){           return inner.offer(packet);       }              public void clear(){           inner.clear();       }              class ClientThread extends Thread {           SocketAddress remote;           SocketChannel channel;           ClientThread(SocketAddress remote){               this.remote = remote;           }           @Override           public void run(){               try{                   try{                       channel = SocketChannel.open();                       channel.configureBlocking(true);                       boolean isSuccess = channel.connect(new InetSocketAddress(30008));                       if(!isSuccess){                           while(!channel.finishConnect()){                               System.out.println("Client is connecting...");                           }                       }                       System.out.println("Client is connected.");   //                  Selector selector = Selector.open();   //                  channel.register(selector, SelectionKey.OP_WRITE);   //                  while(selector.isOpen()){   //                      selector.select();   //                      Iterator SelectionKey  it = selector.selectedKeys().iterator();   //                      while(it.hasNext()){   //                          SelectionKey key = it.next();   //                          it.remove();   //                          if(!key.isValid()){   //                              continue;   //                          }   //                          if(key.isWritable()){   //                              write();   //                          }   //                      }   //                  }                       while(channel.isOpen()){                           write();                       }                   }catch(Exception e){                       e.printStackTrace();                   }finally{                       if(channel != null){                           try{                               channel.close();                           }catch(Exception ex){                               ex.printStackTrace();                           }                       }                   }               }catch(Exception e){                   e.printStackTrace();                   inner.clear();               }           }                      private void write() throws Exception{               Packet packet = inner.take();               synchronized (lock) {                   ByteBuffer body = packet.getBuffer();//                   ByteBuffer head = ByteBuffer.allocate(4);                   head.putInt(body.limit());                   head.flip();                   while(head.hasRemaining()){                       channel.write(head);                   }                   Checksum checksum = new Adler32();                   while(body.hasRemaining()){                       checksum.update(body.get());                   }                   body.rewind();                   while(body.hasRemaining()){                       channel.write(body);                   }                   long cks = checksum.getValue();                   ByteBuffer tail = ByteBuffer.allocate(8);                   tail.putLong(cks);                   tail.flip();                   while(tail.hasRemaining()){                       channel.write(tail);                   }               }                          }       }  
    public String getDataAsString(){           return charset.decode(buffer).toString();       }              public byte[] getData(){           return buffer.array();       }              public ByteBuffer getBuffer(){           return this.buffer;       }                     public static Packet wrap(ByteBuffer buffer){           return new Packet(buffer);       }              public static Packet wrap(String data){           ByteBuffer source = charset.encode(data);           return new Packet(source);       }  
import java.nio.channels.ServerSocketChannel;   import java.nio.channels.SocketChannel;   import java.util.Iterator;   public class ServerControllor {       private int port;       private Thread thread = new ServerThread();;       private Object lock = new Object();       public ServerControllor(){           this(0);       }       public ServerControllor(int port){           this.port = port;       }              public void start(){           if(thread.isAlive()){               return;           }           synchronized (lock) {               thread.start();               System.out.println("Server starting....");           }       }                     class ServerThread extends Thread {           private static final int TIMEOUT = 3000;           private ServerHandler handler = new ServerHandler();           @Override           public void run(){               try{                   ServerSocketChannel channel = null;                   try{                       channel = ServerSocketChannel.open();                       channel.configureBlocking(false);                       channel.socket().setReuseAddress(true);                       channel.socket().bind(new InetSocketAddress(port));                       Selector selector = Selector.open();                       channel.register(selector, SelectionKey.OP_ACCEPT);                       while(selector.isOpen()){                           System.out.println("Server is running,port:" + channel.socket().getLocalPort());                           if(selector.select(TIMEOUT) == 0){                               continue;                           }                           Iterator SelectionKey  it = selector.selectedKeys().iterator();                           while(it.hasNext()){                               SelectionKey key = it.next();                               it.remove();                               if(!key.isValid()){                                   continue;                               }                               if(key.isAcceptable()){                                   accept(key);                               }else if(key.isReadable()){                                   read(key);                               }                           }                       }                   }catch(Exception e){                       e.printStackTrace();                   }finally{                       if(channel != null){                           try{                               channel.close();                           }catch(Exception ex){                               ex.printStackTrace();                           }                       }                   }               }catch(Exception e){                   e.printStackTrace();               }           }                      private void accept(SelectionKey key) throws Exception{               SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();               socketChannel.configureBlocking(true);               //socketChannel.register(key.selector(), SelectionKey.OP_READ);               handler.handle(socketChannel);           }                      private void read(SelectionKey key) throws Exception{               SocketChannel channel = (SocketChannel)key.channel();               //handler.handle(channel);           }       }  
    private static Semaphore semaphore = new Semaphore(Runtime.getRuntime().availableProcessors() + 1);              private static Map SocketChannel,Thread  holder = new HashMap SocketChannel,Thread (32);              @Override       public void handle(SocketChannel channel) {           synchronized (holder) {               if(holder.containsKey(channel)){                   return;               }               Thread t = new ReadThread(channel);               holder.put(channel, t);               t.start();           }       }                     static class ReadThread extends Thread{           SocketChannel channel;           ReadThread(SocketChannel channel){               this.channel = channel;           }           @Override           public void run(){               try{                   semaphore.acquire();                   boolean eof = false;                   while(channel.isOpen()){                       //ByteBuffer byteBuffer = new ByteBuffer(1024);                       ByteBuffer head = ByteBuffer.allocate(4);//int for data-size                       while(true){                           int cb = channel.read(head);                           if(cb == -1){                               throw new RuntimeException("EOF error,data lost!");                           }                           if(isFull(head)){                               break;                           }                       }                       head.flip();                       int dataSize = head.getInt();                       if(dataSize  = 0){                           throw new RuntimeException("Data format error,something lost???");                       }                       ByteBuffer body = ByteBuffer.allocate(dataSize);                       while(true){                           int cb = channel.read(body);                           if(cb == -1){                               throw new RuntimeException("EOF error,data lost!");                           }else if(cb == 0   this.isFull(body)){                               break;                           }                       }                       ByteBuffer tail = ByteBuffer.allocate(8);//int for data-size                       while(true){                           int cb = channel.read(tail);                           if(cb == -1){                               eof = true;                           }                           if(isFull(tail)){                               break;                           }                       }                       tail.flip();                       long sck = tail.getLong();                       Checksum checksum = new Adler32();                       checksum.update(body.array(), 0, dataSize);                       long cck = checksum.getValue();                       if(sck != cck){                           throw new RuntimeException("Sorry,some data lost or be modified,please check!");                       }                       body.flip();                       Packet packet = Packet.wrap(body);                       System.out.println(packet.getDataAsString());                       if(eof){                           break;                       }                   }               }catch(Exception e){                   e.printStackTrace();               }finally{                   if(channel != null){                       try{                           channel.close();                       }catch(Exception ex){                           ex.printStackTrace();                       }                   }                   holder.remove(channel);                   semaphore.release();               }           }                      private boolean isFull(ByteBuffer byteBuffer){               return byteBuffer.position() == byteBuffer.capacity() ? true : false;           }       }  
NIO学习一 NIO相比普通IO提供了功能更为强大、处理数据更快的解决方案。 常用于高性能服务器上。NIO实现高性能处理的原理是使用较少的线程来处理更多的任务 常规io使用的byte[]、char[]进行封装,而NIO采用ByteBuffer类来操作数据,再结合 针对File或socket技术的channel,采用同步非阻塞技术来实现高性能处理,而Netty 正是采用ByteBuffer(缓冲区)、Channel(通道)、Selector(选择器)进行封装的。 因此我们需要先了解NIO相关的知识。
NIO 所谓NIO,就是New IO的缩写。是从JDK 1.4开始引入的全新的IO API。NIO将以更高效的方式进行文件的读写操作,可完全代替传统的IO API使用。而且JDK 1.7对NIO又进行了更新,可以称作NIO 2.0。
netty和nio netty是一个nio客户机-服务器框架,它简化了tcp和udp网络编程,相对于java传统nio,netty还屏蔽了操作系统的差异性,并且兼顾了性能。 Channel channel封装了对socket的原子操作,实质是对socket的封装和扩展。
bio和nio 操作系统为了限制程序的数据访问,来防止获取其他进程或外围设备数据,将CPU划分为用户态和内核态: 内核态(Kernel Mode):cpu可以访问内存的所有数据,包括外围设备,例如硬盘,网卡,cpu也可以将自己从一个程序切换到另一个程序。
在进入NIO之前,先回顾一下Java标准IO方式实现的网络server端: public class IOServerThreadPool { private static final Logger LOGGER = LoggerFactory.
使用netty的NIO来实现一个简单的TimeServer 只想以此来加深java的NIO这方面的知识点~~~ 参考书籍《netty权威指南》(第二版) 这本书,第一二三章,前面讲java的bio,nio,nio2, 讲得蛮好的。
nio之netty5应用 1、netty5和netty4的区别不是很大,但是与netty3差别还是有的。这里不介绍netty4,因为和netty5的方式都差不多。netty5的复杂性相对于netty3要多很多了。基本上架构都被重构了。