zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Netty(二)之整合Marshalling传输实体类

2023-03-14 22:57:46 时间

pom


        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.14.GA</version>
        </dependency>
 
 
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
            <scope>provided</scope>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-river -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-river</artifactId>
            <version>2.0.5.Final</version>
        </dependency>


自定义实体类


使用lombok添加的get、set、无参构造方法


package jbossmarshalling;
 
import lombok.Data;
 
import java.io.Serializable;
import java.util.Date;
 
/**
 * @author CBeann
 * @create 2019-08-31 11:05
 */
@Data
public class Book implements Serializable {
 
    private Integer id;
    private String name;
    private Date date;
 
 
 
 
}


MarshallingCodeCFactory编解码工具类


固定格式,类似工具类


package jbossmarshalling;
 
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
 
/**
 * @author CBeann
 * @create 2019-08-31 14:36
 */
public final class MarshallingCodeCFactory {
 
    //创建JBoss Marshalling解码器MarshallingDecoder
    public static MarshallingDecoder buildMarshallingDecoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }
 
    //创建JBoss Marshalling编码器MarshallingEncoder
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}


TimeServerHandler


package jbossmarshalling;
 
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
 
/**
 * @author CBeann
 * @create 2019-08-27 18:31
 */
public class TimeServerHandler extends ChannelHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
        //服务器读客户端发送来的数据
 
        Book body = (Book) msg;
        System.out.println("The TimeServer receive :" + body);
 
 
        //服务器向客户端回应请求
        Book response = new Book();
        response.setId(11);
        response.setName("hello");
        ctx.writeAndFlush(response);
 
    }
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息发送队列中的消息写入到SocketChannel中发送给对方
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
 
 
}


TimeServer


package jbossmarshalling;
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
 
/**
 * @author CBeann
 * @create 2019-08-27 18:22
 */
public class TimeServer {
 
 
    public static void main(String[] args) throws Exception {
        int port = 8080;
        //配置服务器端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //重点
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(new TimeServerHandler());
                        }
                    });
            //绑定端口
            ChannelFuture f = b.bind(port).sync();
            //等待服务端监听端口关闭
            f.channel().closeFuture().sync();
 
 
        } catch (Exception e) {
 
        } finally {
            //优雅关闭,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}


TimeClientHandler


package jbossmarshalling;
 
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
 
/**
 * @author CBeann
 * @create 2019-08-27 18:47
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
 
 
    //客户端读取服务器发送的数据
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
 
            Book body = (Book) msg;
            System.out.println("Now is:" + body);
        } catch (Exception e) {
 
        } finally {
            //标配
            ReferenceCountUtil.release(msg);
        }
 
 
    }
 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将消息发送队列中的消息写入到SocketChannel中发送给对方
        ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}


TimeClient


package jbossmarshalling;
 
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
 
import java.util.Date;
 
/**
 * @author CBeann
 * @create 2019-08-27 18:43
 */
public class TimeClient {
 
 
    public static void main(String[] args) throws Exception {
        int port = 8080;
        String host = "127.0.0.1";
        //配置客户端NIO线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //发起异步连接操作
            ChannelFuture f = b.connect(host, port).sync();
 
            Book book = new Book();
            book.setId(1);
            book.setName("十万个为什么");
            book.setDate(new Date());
 
 
            //f.channel().writeAndFlush(Unpooled.copiedBuffer("您好".getBytes()));
            f.channel().writeAndFlush(book);
            //Thread.sleep0);//防止TCP粘包
 
 
            //等待客户端链路关闭
            f.channel().closeFuture().sync();
 
        } catch (Exception e) {
 
        } finally {
            //优雅关闭
            group.shutdownGracefully();
        }
    }
}


出现的问题


1)代码没有问题,就是客户端服务器端不交互


依赖有问题


 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-serial -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.3.14.GA</version>
        </dependency>
 
 
        <!-- https://mvnrepository.com/artifact/org.jboss.marshalling/jboss-marshalling-river -->
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-river</artifactId>
            <version>2.0.5.Final</version>
        </dependency>


结果


亲测有效