使用场景
任务队列中的 Task 有 3 种典型使用场景
- 用户程序自定义的普通任务 [举例说明]
- 用户自定义定时任务
- 非当前 Reactor 线程调用 Channel 的各种方法
比如这里我们有一个非常耗时长的业务,需要10秒钟才能执行完成。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务....",CharsetUtil.UTF_8)); System.out.println("go on...");
}
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|


这样是不行的,是阻塞的。我们需要异步执行。
自定义普通任务
通过ctx.channel().eventLoop().execute(Runnable command)
添加到TaskQueue中

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8)); } catch (Exception e) { System.out.println("发生异常" + e.getMessage()); } } });
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8)); } catch (Exception e) { System.out.println("发生异常" + e.getMessage()); } } });
System.out.println("go on...");
}
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|

go on在客户端连接成功后直接运行。而TaskQueue中的任务异步执行,按照所需时间顺序执行。5秒执行第一个,再过5秒后执行下一个。总共耗费10秒。
我们通过Debug查看是否真正添加到了TaskQueue中。
ctx->pipeline->channel->eventLoop->taskQueue
中可以看到大小为2

自定义定时任务
通过ctx.channel().eventLoop().schedule(Runnable command, long delay, TimeUnit unit)
该任务是提交到scheduleTaskQueue
中


成功执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8)); } catch (Exception e) { System.out.println("发生异常" + e.getMessage()); } } });
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8)); } catch (Exception e) { System.out.println("发生异常" + e.getMessage()); } } });
ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务3....", CharsetUtil.UTF_8)); } catch (Exception e) { System.out.println("发生异常" + e.getMessage()); } } },5, TimeUnit.SECONDS);
System.out.println("go on...");
}
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
同样我们通过Debug来查看是否存放在scheduleTaskQueue
中。
ctx->pipeline->channel->eventLoop->taskQueue

的确是存放于此的。
非当前Reactor调用Channel

每个客户端的SocketChannel
是不同的。可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue
或者scheduleTaskQueue

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
public class NettyServer { public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode()); socketChannel.pipeline().addLast(new NettyServerHandler()); } });
System.out.println("......服务器 is ready..."); ChannelFuture cf = bootstrap.bind(6688).sync();
cf.channel().closeFuture().sync(); } finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
Netty模型再说明
Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定 在其上的 socket 网络通道。
NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline