netty-protobuf传输多种类型
proto文件
syntax = "proto3";
option optimize_for = SPEED; //加快解析
option java_package = "com.dqn.netty"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; //外部类名称
//protobuf 可以使用message管理其他的message
message MyMessage{
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3要求enum的属性编号从0开始
WorkerType = 1;
}
DataType data_type = 1;//用data_type来标识传的是哪一个枚举类型
//表示每次枚举类型最多只能出现其中的一个,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}
}
message Student{
int32 id = 1; //Student类的属性
string name = 2;
}
message Worker{
string name = 1;
int32 age = 2;
}
服务端
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
//创建服务器端的启动对象,配置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置
bootstrap
.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()))
.addLast(new NettyServerHandler());//自定义处理器
}
});//给workerGroup的EventLoop对应的管道设置处理器
//启动服务器, 绑定端口,并设置同步
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
//关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
处理器
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType来显示不同的信息
MyDataInfo.MyMessage.DataType datType = msg.getDataType();
if (datType == MyDataInfo.MyMessage.DataType.StudentType){
MyDataInfo.Student student = msg.getStudent();
System.out.println("id="+student.getId()+" name="+student.getName());
}else {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("name="+worker.getName()+" age="+worker.getAge());
}
}
}
客户端
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
//创建客户端启动对象
//注意:客户端使用的不是ServerBootstrap是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//相关设置
bootstrap
.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ProtobufEncoder())
.addLast(new NettyClientHandler());//加入自定义处理器
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
//关闭
eventExecutors.shutdownGracefully();
处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机发送随机对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if (0 == random){
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder()
.setId(3)
.setName("zhangsan")
.build())
.build();
}else {
myMessage = MyDataInfo.MyMessage.newBuilder()
.setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder()
.setAge(20)
.setName("lisi")
.build())
.build();
}
//发送
ctx.writeAndFlush(myMessage);
}
}