Netty快速入门实例 TCP 服务
Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
服务器可以回复消息给客户端 “hello, 客户端~”
首先导入依赖
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.53.Final</version> </dependency>
|
服务端

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class NettyServer { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode()); socketChannel.pipeline().addLast(new NettyServerHandler()); } });
System.out.println("......服务器 is ready..."); ChannelFuture cf = bootstrap.bind(6688).sync();
cf.channel().closeFuture().sync(); } finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|

我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范),这时我们自定义一个Handler,才能称为一个handler。
channelRead()
方法读取客户端发送的消息
channelReadComplete()
方法数据读取完成
exceptionCaught()
方法处理异常,一般是需要关闭通道
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
客户端

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } });
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventExecutors.shutdownGracefully(); } } }
|

channelActive()
方法:当通道就绪就会触发该方法
channelRead()
方法:当通道有读取事件时,会触发
exceptionCaught()
方法:出现异常执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client" + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel(); } }
|


实例分析
BossGroup和WorkerGroup是怎么确定下面含有多少个子线程NioEventLoop的??
BossGroup和BorkerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2

一直往下追,到最后会看见如下源码

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
等于0时为DEFAULT_EVENT_LOOP_THREADS

我们可以通过该代码System.out.println(Runtime.getRuntime().availableProcessors());
查看自己电脑的CPU核数。

我的电脑CPU核数为8。所以默认的NioEventLoop个数为16。我们也可以通过Debug来验证。


我们把boosGroup设置为1,workerGroup设置为8。服务端查看客户端使用的线程号。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程"+Thread.currentThread().getName()); System.out.println("server ctx = "+ ctx); ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
允许并行运行Allow parallel run

接着启动服务端,和9个客户端。查看结果。

依次使用(类似轮询)
ChannelHandlerContext
ctx对象又包含什么呢??pipeline
和channel
的关系是什么呢?
ctx.channel();
获取channel
ctx.pipeline();
获取pipeline

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程"+Thread.currentThread().getName()); System.out.println("server ctx = "+ ctx); System.out.println("看看channel和pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
Debug运行查看运行结果。



ctx包含了许多信息,同时包含了pipeline和channel。
pipeline中包含了channel,channel中包含了pipeline。