zl程序教程

您现在的位置是:首页 >  其他

当前栏目

netty接收大文件的方法

文件方法 接收 Netty
2023-09-27 14:23:13 时间

参考:http://blog.csdn.net/linuu/article/details/51371595

https://www.jianshu.com/p/a0a51fd79f62

netty默认是只能接收1024个字节,但是我们要传输大文件怎么办?

上代码:

改之后服务端:

package server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.eshore.ismp.hbinterface.service.BizCommonService;
import com.eshore.ismp.hbinterface.util.ConfigLoadUtil;

public class SpsServer {
	 private static final Logger logger = Logger.getLogger(SpsServer.class);  
	    private static int PORT = 10001;  
	    /**用于分配处理业务线程的线程组个数 */  
	    protected static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors()*2; //默认  
	    /** 业务出现线程大小*/  
	    protected static final int BIZTHREADSIZE = 4;  
	        /* 
	     * NioEventLoopGroup实际上就是个线程池, 
	     * NioEventLoopGroup在后台启动了n个NioEventLoop来处理Channel事件, 
	     * 每一个NioEventLoop负责处理m个Channel, 
	     * NioEventLoopGroup从NioEventLoop数组里挨个取出NioEventLoop来处理Channel 
	     */  
	    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE);  
	    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZTHREADSIZE);  
	      
	    protected static void run(final BizCommonService bizCommonService) throws Exception {  
	    	String PORTs=ConfigLoadUtil.getValue("toSpsServerPort");
	    	PORT=Integer.parseInt(PORTs);
	    	logger.info("PORT IS:"+PORT);
	        ServerBootstrap b = new ServerBootstrap();  
	        b.group(bossGroup, workerGroup);  
	        b.channel(NioServerSocketChannel.class);  
	        b.childHandler(new ChannelInitializer<SocketChannel>() {  
	            @Override  
	            public void initChannel(SocketChannel ch) throws Exception {  
	                ChannelPipeline pipeline = ch.pipeline();  
	               /* pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));  
	                pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));  */
	                ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
					pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));
	                pipeline.addLast("decoder", new StringDecoder(Charset.forName("GBK")));  
	                pipeline.addLast("encoder", new StringEncoder(Charset.forName("GBK")));
	                pipeline.addLast(new SpsServerHandler(bizCommonService));  
	            }  
	        });  
	  
	        b.bind(PORT).sync();  
	        logger.info("TCP服务器已启动");  
	    }  
	      
	    protected static void shutdown() {  
	        workerGroup.shutdownGracefully();  
	        bossGroup.shutdownGracefully();  
	    }  
	  
	    public static void main(String[] args) throws Exception {  
	    	try{
	    		ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
	    				new String[] { "applicationContext.xml" });
	    		context.start();
	    		BizCommonService bizCommonService = (BizCommonService) context.getBean("bizCommonService");
	    		SpsServer.run(bizCommonService);  
	    	}catch(Exception e){
	    		logger.error("start sps interface server error:",e);
	    		System.exit(-1);
	    	}
	    }  
}

改之后客户端:

 

package fourNoBlocking;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;


/**
 * 
 * 发送报文给客户端
 * 
 * 
 * @date 2016年12月14日 上午11:56:27
 * @since 1.0
 */
public class SendClient {

    private static final String ENCODING = "GBK";

    public static String send(String ip, int port, String sendStr, int timeout) {
        long start = System.currentTimeMillis();
        System.out.println(sendStr.length());
        if (sendStr == null || "".equals(sendStr)) {
            return "str is null";
        }
        Socket client = null;
        OutputStream stream = null;
        InputStream is = null;

        try {
            client = new Socket();
            InetSocketAddress address = new InetSocketAddress(ip, port);
            client.connect(address);

            timeout = timeout >= 0 ? timeout : 3500;
            client.setSoTimeout(timeout);
            stream = client.getOutputStream();
            is = client.getInputStream();

            int len = 0;

            len = sendStr.getBytes(ENCODING).length;
            ByteBuffer buf = ByteBuffer.allocate(len);
            byte[] bytes = sendStr.getBytes(ENCODING);
            buf.put(bytes);
            stream.write(buf.array(), 0, len);
            stream.flush();
            String res = "";
            int i = 0;
            byte[] b = new byte[6555];
            while ((i = is.read(b)) != -1) {
                res = new String(b, 0, i);
                System.out.println(res);
                break;
            }
            long end = System.currentTimeMillis();


            return res;
        } catch (Exception e) {
            StringBuilder strBuilder = new StringBuilder();
            strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
            .append(System.currentTimeMillis());
            return strBuilder.toString();
        } finally {
            if (client != null) {
                try {
                    client.close();
                } catch (IOException e) {
                    StringBuilder strBuilder = new StringBuilder();
                    strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
                    .append(System.currentTimeMillis());
                }
            }

            if (stream != null) {
                try {
                    stream.close();
                } catch (IOException e) {
                    StringBuilder strBuilder = new StringBuilder();
                    strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
                    .append(System.currentTimeMillis());
                }
            }
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) {
                    StringBuilder strBuilder = new StringBuilder();
                    strBuilder.append("error send message").append(e.getMessage()).append("&errorID=")
                    .append(System.currentTimeMillis());
                }
            }
        }
    }
    
    public static void main(String[] args) {
    	String msg="";
        msg="FFFF76623634010100102700170103IBSS017555      000000021800100023402287248808*766236340100200001178400003001785000030217860000302110000004075510100020SZ2000000054121442461020001241324186148310300593PM_DJDHHM||83456517||001#$PM_HYLX||0||001#$BA_MSMAN||海豚||001#$PM_DJQYYB||518000||001#$PM_DJQYMC||深圳市福田区人力资源服务中心||001#$PM_BHHM||83456517||001#$PM_DJQYDZ||福田区福强路深圳文化创意园世纪工艺品文化广场309栋B座1-3层||001#$PM_SFZDXY||XY02||001#$PM_DJKHXX||||001#$BA_MSDEPTNAME||12||001#$PM_DLS||DSL6||001#$PM_YWSLLB||SLLB01||001#$PM_SLDYSLSH||0||001#$PM_JFQ||01||001#$PM_DJHMGS||1||001#$PM_SRFJ||2||001#$PM_JFJG||1||001#$PM_YZ||30||001#$PM_DXFSSL||100||001#$PB_BILLINGTYPE||000000||005#$PB_USERTYPE||100002||005#$PB_USERCHAR||JFSX01||005#$BEGIN_DATE||20170607||005#$END_DATE||||005#$10400014DXMP214688722910700016号百信息服务中心10800010122810070411400006徐冬生115000088291816511600110114+企业名片行业版包月套餐,114+短信名片包月套餐_定价计划,114+企业名片行业版包月套餐赠送3个月套餐外等额话费优惠11700017755KH000293285120\t";
    	String x=SendClient.send("127.0.0.1", 10001, msg, 3500);
    	System.out.println("return string:"+x);
	}

}

处理类:

package server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.eshore.ismp.hbinterface.service.BizCommonService;

public class SpsServerHandler extends SimpleChannelInboundHandler<Object> {

    private static final Logger logger = LoggerFactory.getLogger(SpsServerHandler.class);
    private BizCommonService bizCommonService;
    public SpsServerHandler(){}
    public SpsServerHandler(BizCommonService bizCommonService){
    	this.bizCommonService=bizCommonService;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info("SERVER接收到消息 msg:{}",msg);
        long start = System.currentTimeMillis();
        boolean result = bizCommonService.sendOperToCacheAysn(String.valueOf(msg));
    	/**
    	 * step 3 : 创建响应报文
    	 */
		String res = bizCommonService.createResponseStr(String.valueOf(msg),result);
		long end = System.currentTimeMillis();
        logger.debug("SpsServer request:{} res:{} time cost:{}ms",String.valueOf(msg),res,(end-start));
        ctx.channel().writeAndFlush(res);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
            Throwable cause) throws Exception {
        logger.warn("Unexpected exception from downstream.", cause);
        ctx.close();
    }

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		logger.info("client closed:"+ctx.channel().hashCode());
		super.channelInactive(ctx);
	}
}

输出:

 length:1027

 服务端增加了:

ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
					pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2048,delimiter));

客户端报文增加了

\t