handler基本说明

Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等

其中ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。

例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。

ChannelPipeline提供了ChannelHandler链的容器

以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。同理,对于服务端来说,也是一样的,有数据来就是入站,有数据输出就是出站。

image-20201123171017623

为什么服务端和客户端的handler都是继承SimpleChannelInboundHandler,而没有ChannelOutboundHandler出站类?

实际上当我们在handler中调用ctx.writeAndFlush()方法后,就会将数据交给ChannelOutboundHandler进行出站处理,只是我们没有去定义出站类而已,若有需求可以自己去实现ChannelOutboundHandler出站类。

总结就是客户端和服务端都有出站和入站的操作,形成一个循环链

服务端-------->出站-------->入站-------->客户端 -------->出站-------->入站-------->服务端

编解码器

当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。

Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

解码器

ByteToMessageDecoder decode()方法进行解码

image-20201123172449318

1)由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理。

实例

1
2
3
4
5
6
7
8
9
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//判断ByteBuf的字节数是否大于4个字节
if (in.readableBytes() >= 4) {
out.add(in.readInt());//读取Int型数据添加到List中,自动装箱
}
}
}

这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据。

编码器

MessageToByteEncoder Encoder()方法进行编码

image-20201123172403567

image-20201123173540073

实例

使用自定义的编码器和解码器来说明Netty的handler 调用机制

  • 客户端发送long -> 服务器
  • 服务端发送long -> 客户端

服务器

image-20201123202153430

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyServer {

public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());

ChannelFuture channelFuture = bootstrap.bind(7000).sync();

channelFuture.channel().closeFuture().sync();

} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}

image-20201123202618613

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

//添加入站的handler进行解码 MyByteToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
//出站编码
pipeline.addLast(new MyLongToByteEncoder());
//添加自定义handler
pipeline.addLast(new MyServerHandler());

}
}

image-20201123203702147

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {


@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);

//给客户端发送一个long
ctx.writeAndFlush(98765L);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

编码器

继承MessageToByteEncoder,重写encode()编码方法

image-20201123204916471

1
2
3
4
5
6
7
8
9
10
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {

//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MessageToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}

解码器

继承ByteToMessageDecode,重写decode()解码方法

image-20201123203915543

decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。

如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyByteToLongDecoder extends ByteToMessageDecoder {

/**
*decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。
* 如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。
*
* @param ctx 上下文对象
* @param in 入站的ByteBuf
* @param out List集合,将解码后的数据传给下一个handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

System.out.println("MyByteToLongDecoder decode 被调用");
//因为long 8个字节,需要判断有8个字节,才能读取一个long
if (in.readableBytes()>=8){
out.add(in.readLong());
}
}
}

客户端

image-20201123203957557

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyClient {

public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new MyClientInitializer());//自定义初始化类

ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();

channelFuture.channel().closeFuture().sync();


} finally {
group.shutdownGracefully();
}


}
}

image-20201123204147327

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {


@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

//加入一个出站的handler,对数据进行一个编码。
pipeline.addLast(new MyLongToByteEncoder());
//出站解码
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义对handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}

image-20201123202853744

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器端IP:"+ctx.channel().remoteAddress());
System.out.println("收到服务器消息:"+msg);
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
ctx.writeAndFlush(123456L);//发送一个long

}
}

测试

我们先来一张图来方便我们分析结果

Handler调用机制

服务器

image-20201123205645627

(客户端——->服务器):入站。服务器接受客户端发送的数据,解码器首先被调用,接着调用我们自定义的ServerHandler打印数据。

(服务器——->客户端):出站。服务器发送数据给客户端首先调用我们自定义的处理器ServerHandler,接着调用编码器,打印数据。

客户端

image-20201123205713335

(客户端——->服务器):出站。客户端一启动首先调用我们自己编写的ClientHandler发送数据,接着调用编码器编码数据。

(服务器——->客户端):入站。首先使用解码器解码服务器发送的数据,接着传递调用给自定义的ClientHandler。

测试结果与分析结果一致测试成功!

细节

我们修改客户端发送的数据为一个16字节的字符串按UTF-8编码。运行测试。

image-20201123213136422

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器端IP:"+ctx.channel().remoteAddress());
System.out.println("收到服务器消息:"+msg);
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//ctx.writeAndFlush(123456L);//发送一个long

//1."abcdabcdabcdabcd" 是16个字节
//2.该处理器的前一个handler是MyLongToByteEncoder没有进行编码处理 为什么?原因在它的父类 MessageToByteEncoder
//因此我们编写 Encoder 是注意传人的数据类型和处理的数据类型一致
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));

}
}

image-20201123213307154

image-20201123213427984

16字节每次只读Long类型也就是8字节,所以需要读两次。每次解码过后就交给了ServerHandler处理,也就发送了98765L从而也调用了两次编码器。

image-20201123213626062

发送字符串数据没有调用Long类型编码器的编码方法。

后面是接受服务器两次发送的数据流程。

image-20201123213724130

Handler处理细节

不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会执行相应操作。

在解码器 进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致