Netty快速入门实例 TCP 服务
Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
服务器可以回复消息给客户端 “hello, 客户端~”
首先导入依赖
| 12
 3
 4
 5
 
 | <dependency><groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.53.Final</version>
 </dependency>
 
 | 
服务端

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | public class NettyServer {public static void main(String[] args) throws InterruptedException {
 
 NioEventLoopGroup boosGroup = new NioEventLoopGroup();
 NioEventLoopGroup workerGroup = new NioEventLoopGroup();
 
 try {
 
 ServerBootstrap bootstrap = new ServerBootstrap();
 
 
 bootstrap.group(boosGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 128)
 .childOption(ChannelOption.SO_KEEPALIVE,true)
 
 .childHandler(new ChannelInitializer<SocketChannel>() {
 
 @Override
 protected void initChannel(SocketChannel socketChannel) throws Exception {
 
 System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
 socketChannel.pipeline().addLast(new NettyServerHandler());
 }
 });
 
 System.out.println("......服务器 is ready...");
 
 ChannelFuture cf = bootstrap.bind(6688).sync();
 
 
 cf.channel().closeFuture().sync();
 } finally {
 boosGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
 }
 }
 
 | 

我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范),这时我们自定义一个Handler,才能称为一个handler。
- channelRead()方法读取客户端发送的消息
- channelReadComplete()方法数据读取完成
- exceptionCaught()方法处理异常,一般是需要关闭通道
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 
 ByteBuf buf = (ByteBuf) msg;
 System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
 System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 
客户端

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 
 | public class NettyClient {
 public static void main(String[] args) throws InterruptedException {
 
 
 NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
 
 try {
 
 Bootstrap bootstrap = new Bootstrap();
 
 
 bootstrap.group(eventExecutors)
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<SocketChannel>() {
 @Override
 protected void initChannel(SocketChannel socketChannel) throws Exception {
 socketChannel.pipeline().addLast(new NettyClientHandler());
 }
 });
 
 System.out.println("客户端 ok..");
 
 
 
 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
 
 channelFuture.channel().closeFuture().sync();
 } finally {
 eventExecutors.shutdownGracefully();
 }
 }
 }
 
 | 

- channelActive()方法:当通道就绪就会触发该方法
- channelRead()方法:当通道有读取事件时,会触发
- exceptionCaught()方法:出现异常执行
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | public class NettyClientHandler extends ChannelInboundHandlerAdapter {
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("client" + ctx);
 ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
 }
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf buf = (ByteBuf) msg;
 System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
 System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
 
 }
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.channel();
 }
 }
 
 | 


实例分析
BossGroup和WorkerGroup是怎么确定下面含有多少个子线程NioEventLoop的??
BossGroup和BorkerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2

一直往下追,到最后会看见如下源码

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);等于0时为DEFAULT_EVENT_LOOP_THREADS

我们可以通过该代码System.out.println(Runtime.getRuntime().availableProcessors());查看自己电脑的CPU核数。

我的电脑CPU核数为8。所以默认的NioEventLoop个数为16。我们也可以通过Debug来验证。


我们把boosGroup设置为1,workerGroup设置为8。服务端查看客户端使用的线程号。

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 System.out.println("服务器读取线程"+Thread.currentThread().getName());
 System.out.println("server ctx = "+ ctx);
 
 
 ByteBuf buf = (ByteBuf) msg;
 System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
 System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 
允许并行运行Allow parallel run

接着启动服务端,和9个客户端。查看结果。

依次使用(类似轮询)
ChannelHandlerContextctx对象又包含什么呢??pipeline和channel的关系是什么呢?
ctx.channel();获取channel
ctx.pipeline();获取pipeline

| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 
 | 
 
 
 public class NettyServerHandler extends ChannelInboundHandlerAdapter {
 
 
 
 
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 
 System.out.println("服务器读取线程"+Thread.currentThread().getName());
 System.out.println("server ctx = "+ ctx);
 System.out.println("看看channel和pipeline的关系");
 Channel channel = ctx.channel();
 ChannelPipeline pipeline = ctx.pipeline();
 
 
 ByteBuf buf = (ByteBuf) msg;
 System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
 System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
 }
 
 
 @Override
 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 
 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
 }
 
 
 
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 cause.printStackTrace();
 ctx.close();
 }
 }
 
 | 
Debug运行查看运行结果。



ctx包含了许多信息,同时包含了pipeline和channel。
pipeline中包含了channel,channel中包含了pipeline。