Netty快速入门实例 TCP 服务

Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”

服务器可以回复消息给客户端 “hello, 客户端~”

首先导入依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.53.Final</version>
</dependency>

服务端

image-20201108170611983

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();

//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可以使用一个集合管理SocketChannel,再推送消息时,可以将业务加入到各个channel对应到NIOEventLoop到taskQueue或者scheduleTaskQueue
System.out.println("客户SocketChannel hashcode="+socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器

System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();

//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

image-20201108171806580

我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范),这时我们自定义一个Handler,才能称为一个handler。

  • channelRead()方法读取客户端发送的消息
  • channelReadComplete()方法数据读取完成
  • exceptionCaught()方法处理异常,一般是需要关闭通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

客户端

image-20201108172548785

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyClient {

public static void main(String[] args) throws InterruptedException {

//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();

//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});

System.out.println("客户端 ok..");

//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}

image-20201108172633372

  • channelActive()方法:当通道就绪就会触发该方法
  • channelRead()方法:当通道有读取事件时,会触发
  • exceptionCaught()方法:出现异常执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server", CharsetUtil.UTF_8));
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());

}

//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}

image-20201108172846775

image-20201108172855031

实例分析

BossGroup和WorkerGroup是怎么确定下面含有多少个子线程NioEventLoop的??

BossGroup和BorkerGroup含有的子线程(NioEventLoop)的个数 默认实际 cpu核数 * 2

image-20201108174102996

一直往下追,到最后会看见如下源码

image-20201108174221044

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);等于0时为DEFAULT_EVENT_LOOP_THREADS

image-20201108174348571

我们可以通过该代码System.out.println(Runtime.getRuntime().availableProcessors());查看自己电脑的CPU核数。

image-20201108174540695

我的电脑CPU核数为8。所以默认的NioEventLoop个数为16。我们也可以通过Debug来验证。

image-20201108174726184

image-20201108174744466

我们把boosGroup设置为1,workerGroup设置为8。服务端查看客户端使用的线程号。

image-20201108192015195

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("服务器读取线程"+Thread.currentThread().getName());
System.out.println("server ctx = "+ ctx);
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

允许并行运行Allow parallel run

image-20201108192238610

接着启动服务端,和9个客户端。查看结果。

image-20201108192212012

依次使用(类似轮询)

ChannelHandlerContextctx对象又包含什么呢??pipelinechannel的关系是什么呢?

ctx.channel();获取channel

ctx.pipeline();获取pipeline

image-20201108193902593

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 1.我们自定义一个Handler需要继承netty规定好对某个HandlerAdapter(规范)
* 2.这时我们自定义一个Handler,才能称为一个handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

//读取数据实际(这里我们可以读取客户端发送的消息)
//1.ChannelHandlerContext ctx : 上下文对象,含有管道pipeline,通道channel,地址
//2.Object msg: 就是客户端发送的数据 默认Object
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("服务器读取线程"+Thread.currentThread().getName());
System.out.println("server ctx = "+ ctx);
System.out.println("看看channel和pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();//本质是一个双向链表,出站入站
//将msg转成一个ByteBuf
//ByteBuf是Netty提供的,不是NIO的ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端发送地址是:"+ctx.channel().remoteAddress());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

Debug运行查看运行结果。

image-20201108194619905

image-20201108194746009

image-20201108194858443

ctx包含了许多信息,同时包含了pipeline和channel。

pipeline中包含了channel,channel中包含了pipeline。