handler基本说明
Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等
其中ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。
例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的。
ChannelPipeline提供了ChannelHandler链的容器
以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的。同理,对于服务端来说,也是一样的,有数据来就是入站,有数据输出就是出站。

为什么服务端和客户端的handler都是继承SimpleChannelInboundHandler,而没有ChannelOutboundHandler出站类?
实际上当我们在handler中调用ctx.writeAndFlush()方法后,就会将数据交给ChannelOutboundHandler进行出站处理,只是我们没有去定义出站类而已,若有需求可以自己去实现ChannelOutboundHandler出站类。
总结就是客户端和服务端都有出站和入站的操作,形成一个循环链
服务端-------->出站-------->入站-------->客户端 -------->出站-------->入站-------->服务端
编解码器
当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler
或者ChannelOutboundHandler
接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
解码器
ByteToMessageDecoder decode()
方法进行解码

1)由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理。
实例
1 2 3 4 5 6 7 8 9
| public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() >= 4) { out.add(in.readInt()); } } }
|
这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是否具有足够的数据。
编码器
MessageToByteEncoder Encoder()
方法进行编码


实例
使用自定义的编码器和解码器来说明Netty的handler 调用机制
- 客户端发送long -> 服务器
- 服务端发送long -> 客户端
服务器

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class MyServer {
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup boosGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer());
ChannelFuture channelFuture = bootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
} }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyByteToLongDecoder()); pipeline.addLast(new MyLongToByteEncoder()); pipeline.addLast(new MyServerHandler());
} }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("从客户端" + ctx.channel().remoteAddress() + "读取到long" + msg);
ctx.writeAndFlush(98765L); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
编码器
继承MessageToByteEncoder
,重写encode()
编码方法

1 2 3 4 5 6 7 8 9 10
| public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System.out.println("MessageToByteEncoder encode 被调用"); System.out.println("msg=" + msg); out.writeLong(msg); } }
|
解码器
继承ByteToMessageDecode
,重写decode()
解码方法

decode 会根据接受的数据,被调用多次,直到确定没有新的元素被添加到list,或者ByteBuf没有更多到可读字节为止。
如果list out 不为空,就会将list的内容传递给下一个ChannelInBoundHandler处理,该处理器的方法也会被调用多次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyByteToLongDecoder extends ByteToMessageDecoder {
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder decode 被调用"); if (in.readableBytes()>=8){ out.add(in.readLong()); } } }
|
客户端

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class MyClient {
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally { group.shutdownGracefully(); }
} }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyLongToByteEncoder()); pipeline.addLast(new MyByteToLongDecoder()); pipeline.addLast(new MyClientHandler()); } }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器端IP:"+ctx.channel().remoteAddress()); System.out.println("收到服务器消息:"+msg); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 发送数据"); ctx.writeAndFlush(123456L);
} }
|
测试
我们先来一张图来方便我们分析结果

服务器

(客户端——->服务器):入站。服务器接受客户端发送的数据,解码器首先被调用,接着调用我们自定义的ServerHandler打印数据。
(服务器——->客户端):出站。服务器发送数据给客户端首先调用我们自定义的处理器ServerHandler,接着调用编码器,打印数据。
客户端

(客户端——->服务器):出站。客户端一启动首先调用我们自己编写的ClientHandler发送数据,接着调用编码器编码数据。
(服务器——->客户端):入站。首先使用解码器解码服务器发送的数据,接着传递调用给自定义的ClientHandler。
测试结果与分析结果一致测试成功!
细节
我们修改客户端发送的数据为一个16字节的字符串按UTF-8编码。运行测试。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception { System.out.println("服务器端IP:"+ctx.channel().remoteAddress()); System.out.println("收到服务器消息:"+msg); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("MyClientHandler 发送数据");
ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd", CharsetUtil.UTF_8));
} }
|


16字节每次只读Long类型也就是8字节,所以需要读两次。每次解码过后就交给了ServerHandler处理,也就发送了98765L
从而也调用了两次编码器。

发送字符串数据没有调用Long类型编码器的编码方法。
后面是接受服务器两次发送的数据流程。


不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会执行相应操作。
在解码器 进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致