netty-自定义编解码器
编码器
//父类MessageToByteEncoder中会判断,当前msg是不是应该处理的类型,如果不是则不执行
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder-encode被调用");
System.out.println("msg="+msg);
out.writeLong(msg);
}
}
解码器
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* 此方法会根据接收的数据,被调用多次,直到没有新的数据被添加到list
* 或者bytebuf没有更多的可读数据为止
* 如果list out不为空,就会将list的内容传递给下一个
* @param ctx 上下文
* @param in 入栈的bytebuf
* @param out list集合,将解码后的数据传给下一个handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder-decode 被调用");
//一个long是8个字节,需要读8个字节,才能读取一个long
if (in.readableBytes() >= 8){
out.add(in.readLong());
}
}
}
使用
服务端
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());//自定义初始化器
ChannelFuture cf = serverBootstrap.bind(7000).sync();
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
初始化器
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//入栈handler进行解码
pipeline.addLast(new MyByteToLongDecoder());
//出栈handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler处理业务逻辑
pipeline.addLast(new MyServerHandler());
}
}
业务处理器
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("读取到数据:"+msg);
//回复客户端
ctx.writeAndFlush(7654321L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端
EventLoopGroup group = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());//自定义初始化器
ChannelFuture cf = bootstrap
.connect("localhost",7000).sync();
cf.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
初始化器
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new MyLongToByteEncoder()) //加出栈handler 对数据进行编码
.addLast(new MyByteToLongDecoder()) //加入栈handler 对数据进行解码
.addLast(new MyClientHandler()); //自定义handler处理业务逻辑
}
}
业务处理器
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("收到服务器消息:"+msg);
}
//发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler-channelActive 发送数据");
ctx.writeAndFlush(1234567L);
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdefgabcdefg",CharsetUtil.UTF_8));
}
}
ReplayingDecoder
ReplayingDecoder扩展了ByteToMessageDecoder,使用这个类我们不需要调用readableBytes方法。泛型指定了用户状态管理的类型(Void代表不需要状态管理)
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//不需要判断数据是否足够读取,内部会进行判断
out.add(in.readLong());
}
}
注意:
- 并不是所有的ByteBuf操作都被支持,如果调用了一个不被支持的方法,会抛出异常
- 在某些情况下会慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂,消息会被拆成多个碎片,速度变慢
其他编解码器
解码器
- LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制符
\n或者\r\n
作为分隔符来解析数 - DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符
- HttpObjectDecoder:一个http数据的解码器
- LengthFieldBasedFrameDecoder:通过指定长度来标识整包信息这样就可以自动的处理粘包和半包消息
编码器
- 和解码器对应