netty-任务队列
使用场景
- 用户程序自定义的普通任务
- 用户自定义定时任务
- 非当前reactor线程调用channel的各种方法
用户程序自定义的普通任务
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(这里可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//向taskQueue中添加一个任务
ctx.channel().eventLoop().execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
//此处有一个耗时任务
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,abc",CharsetUtil.UTF_8));
}
});
//再向taskQueue中添加一个任务
//和上一个是在同一个线程中串行执行的,第二个回应会在30s之后返回
ctx.channel().eventLoop().execute(new Runnable() {
@SneakyThrows
@Override
public void run() {
//此处有一个耗时任务
Thread.sleep(20*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,123",CharsetUtil.UTF_8));
}
});
System.out.println("ok...");
}
}
用户自定义定时任务
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将任务提交到scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@SneakyThrows
@Override
public void run() {
//此处有一个耗时任务
Thread.sleep(5*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,dqn",CharsetUtil.UTF_8));
}
},5, TimeUnit.SECONDS);//延迟5秒执行
System.out.println("ok...");
}
}
非当前reactor线程调用channel的各种方法
例如在推送系统的业务线程里,根据用户的标识,找到对应的channel引用,然后调用write方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中最后被异步消费