netty-demo
服务端
//创建BossGroup和WorkerGroup
//bossGroup只处理连接请求,真正与客户端业务处理会交给workerGroup
//这两个线程组都是无限循环
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
//创建服务器端的启动对象,配置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置
bootstrap
.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new NettyServerHandler());//自定义处理器
}
});//给workerGroup的EventLoop对应的管道设置处理器
System.out.println("服务器准备好了...");
//启动服务器, 绑定端口,并设置同步
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
//关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
自定义处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(这里可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx = "+ctx);
//将msg转成一个ByteBuf(netty提供的)
ByteBuf buf = (ByteBuf)msg;
System.out.println("客户端发送消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写入缓冲区并刷新(write+flush)
//Unpooled.copiedBuffer进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello...", CharsetUtil.UTF_8));
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.channel().close();
}
}
客户端
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
//创建客户端启动对象
//注意:客户端使用的不是ServerBootstrap是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//相关设置
bootstrap
.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定义处理器
}
});
System.out.println("客户端 ok...");
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
//关闭
eventExecutors.shutdownGracefully();
自定义处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client:"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", CharsetUtil.UTF_8));
}
/**
* 当通道有读取事件时会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器地址:"+ctx.channel().remoteAddress());
}
/**
* 发生异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();//打印异常
ctx.close();//关闭
}
}