使用场景

任务队列中的 Task 有 3 种典型使用场景

  1. 用户程序自定义的普通任务 [举例说明]
  2. 用户自定义定时任务
  3. 非当前 Reactor 线程调用 Channel 的各种方法

比如这里我们有一个非常耗时长的业务,需要10秒钟才能执行完成。

image-20201108202610610

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//比如这里我们有一个非常耗时长的业务->异步执行->提交该channel对应的NIOEventLoop的taskQueue中
Thread.sleep(10*1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务....",CharsetUtil.UTF_8));
System.out.println("go on...");


//System.out.println("服务器读取线程"+Thread.currentThread().getName());
//System.out.println("server ctx = "+ ctx);
//System.out.println("看看channel和pipeline的关系");
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
////将msg转成一个ByteBuf
////ByteBuf是Netty提供的,不是NIO的ByteBuffer
//ByteBuf buf = (ByteBuf) msg;
//System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
//System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

image-20201108202943825

image-20201108203042544

这样是不行的,是阻塞的。我们需要异步执行。

自定义普通任务

通过ctx.channel().eventLoop().execute(Runnable command)添加到TaskQueue中

image-20201108203504143

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//比如这里我们有一个非常耗时长的业务->异步执行->提交该channel对应的NIOEventLoop的taskQueue中
//Thread.sleep(10*1000);
//ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务....",CharsetUtil.UTF_8));
//System.out.println("go on...");

//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8));
} catch (Exception e) {
System.out.println("发生异常" + e.getMessage());
}
}
});


ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8));
} catch (Exception e) {
System.out.println("发生异常" + e.getMessage());
}
}
});

System.out.println("go on...");

//System.out.println("服务器读取线程"+Thread.currentThread().getName());
//System.out.println("server ctx = "+ ctx);
//System.out.println("看看channel和pipeline的关系");
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
////将msg转成一个ByteBuf
////ByteBuf是Netty提供的,不是NIO的ByteBuffer
//ByteBuf buf = (ByteBuf) msg;
//System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
//System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

2020-11-08 20.55.46

go on在客户端连接成功后直接运行。而TaskQueue中的任务异步执行,按照所需时间顺序执行。5秒执行第一个,再过5秒后执行下一个。总共耗费10秒。

我们通过Debug查看是否真正添加到了TaskQueue中。

ctx->pipeline->channel->eventLoop->taskQueue中可以看到大小为2

image-20201108210305203

自定义定时任务

通过ctx.channel().eventLoop().schedule(Runnable command, long delay, TimeUnit unit)该任务是提交到scheduleTaskQueue

image-20201108211500080

2020-11-08 21.15.49

成功执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//比如这里我们有一个非常耗时长的业务->异步执行->提交该channel对应的NIOEventLoop的taskQueue中
//Thread.sleep(10*1000);
//ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务....",CharsetUtil.UTF_8));
//System.out.println("go on...");

//解决方案1 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务1....", CharsetUtil.UTF_8));
} catch (Exception e) {
System.out.println("发生异常" + e.getMessage());
}
}
});


ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务2....", CharsetUtil.UTF_8));
} catch (Exception e) {
System.out.println("发生异常" + e.getMessage());
}
}
});

//用户自定义定时任务 -> 该任务是提交到 scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端超长任务3....", CharsetUtil.UTF_8));
} catch (Exception e) {
System.out.println("发生异常" + e.getMessage());
}
}
},5, TimeUnit.SECONDS);

System.out.println("go on...");

//System.out.println("服务器读取线程"+Thread.currentThread().getName());
//System.out.println("server ctx = "+ ctx);
//System.out.println("看看channel和pipeline的关系");
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
////将msg转成一个ByteBuf
////ByteBuf是Netty提供的,不是NIO的ByteBuffer
//ByteBuf buf = (ByteBuf) msg;
//System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
//System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

同样我们通过Debug来查看是否存放在scheduleTaskQueue中。

ctx->pipeline->channel->eventLoop->taskQueue

image-20201108211938563

的确是存放于此的。

非当前Reactor调用Channel

image-20201108212347118

每个客户端的SocketChannel是不同的。可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue

image-20201108212731847

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 创建BoosGroup和WorkerGroup
* 1.boosGroup只处理连接请求,真正和客户端业户处理会交给workerGroup完成
* 2.两个都是无限循环
* 3.bossGroup和workerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {

//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();

//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue
System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器

System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();

//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

Netty模型再说明

Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。

NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定 在其上的 socket 网络通道。

NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责

  • NioEventLoopGroup 下包含多个 NioEventLoop
  • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
  • 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
  • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
  • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline