使用场景
任务队列中的 Task 有 3 种典型使用场景
- 用户程序自定义的普通任务 [举例说明]
- 用户自定义定时任务
- 非当前 Reactor 线程调用 Channel 的各种方法
比如这里我们有一个非常耗时长的业务,需要10秒钟才能执行完成。

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 
 Thread.sleep(10*1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务....",CharsetUtil.UTF_8));
 System.out.println("go on...");
 
 
 
 
 
 
 
 
 
 
 
 
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 


这样是不行的,是阻塞的。我们需要异步执行。
自定义普通任务
通过ctx.channel().eventLoop().execute(Runnable command)添加到TaskQueue中

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 
 
 
 
 
 
 ctx.channel().eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep(5 * 1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8));
 } catch (Exception e) {
 System.out.println("发生异常" + e.getMessage());
 }
 }
 });
 
 
 ctx.channel().eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep(5 * 1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8));
 } catch (Exception e) {
 System.out.println("发生异常" + e.getMessage());
 }
 }
 });
 
 System.out.println("go on...");
 
 
 
 
 
 
 
 
 
 
 
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 

go on在客户端连接成功后直接运行。而TaskQueue中的任务异步执行,按照所需时间顺序执行。5秒执行第一个,再过5秒后执行下一个。总共耗费10秒。
我们通过Debug查看是否真正添加到了TaskQueue中。
ctx->pipeline->channel->eventLoop->taskQueue中可以看到大小为2

自定义定时任务
通过ctx.channel().eventLoop().schedule(Runnable command, long delay, TimeUnit unit)该任务是提交到scheduleTaskQueue中


成功执行。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 
 
 
 
 
 
 ctx.channel().eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep(5 * 1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8));
 } catch (Exception e) {
 System.out.println("发生异常" + e.getMessage());
 }
 }
 });
 
 
 ctx.channel().eventLoop().execute(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep(5 * 1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8));
 } catch (Exception e) {
 System.out.println("发生异常" + e.getMessage());
 }
 }
 });
 
 
 ctx.channel().eventLoop().schedule(new Runnable() {
 @Override
 public void run() {
 try {
 Thread.sleep(5 * 1000);
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务3....", CharsetUtil.UTF_8));
 } catch (Exception e) {
 System.out.println("发生异常" + e.getMessage());
 }
 }
 },5, TimeUnit.SECONDS);
 
 System.out.println("go on...");
 
 
 
 
 
 
 
 
 
 
 
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 
同样我们通过Debug来查看是否存放在scheduleTaskQueue中。
ctx->pipeline->channel->eventLoop->taskQueue

的确是存放于此的。
非当前Reactor调用Channel

每个客户端的SocketChannel是不同的。可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 
 | 
 
 
 
 
 public class NettyServer {
 public static void main(String[] args) throws InterruptedException {
 
 
 NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
 NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
 
 try {
 
 ServerBootstrap bootstrap = new ServerBootstrap();
 
 
 bootstrap.group(boosGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 128)
 .childOption(ChannelOption.SO_KEEPALIVE,true)
 
 .childHandler(new ChannelInitializer<SocketChannel>() {
 
 @Override
 protected void initChannel(SocketChannel socketChannel) throws Exception {
 
 System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
 socketChannel.pipeline().addLast(new NettyServerHandler());
 }
 });
 
 System.out.println("......服务器 is ready...");
 
 ChannelFuture cf = bootstrap.bind(6688).sync();
 
 
 cf.channel().closeFuture().sync();
 } finally {
 boosGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
 }
 
 | 
Netty模型再说明
Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。 
NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定 在其上的 socket 网络通道。
NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue 
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel 
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline