编码和解码的基本介绍

编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。

image-20201122162541686

codec(编解码器) 的组成部分有两个:

  • decoder(解码器)和 encoder(编码器)。
  • encoder 负责把业务数据转换成字节码数据
  • decoder 负责把字节码数据转换成业务数据

Netty 自身提供了一些 codec(编解码器)

Netty 提供的编码器

  1. StringEncoder,对字符串数据进行编码

  2. ObjectEncoder,对 Java 对象进行编码

    ……

Netty 提供的解码器

  1. StringDecoder, 对字符串数据进行解码

  2. ObjectDecoder,对 Java 对象进行解码

    ……

但是Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题

  • 无法跨语言
  • 序列化后的体积太大,是二进制编码的 5 倍多。
  • 序列化性能太低

引出 新的解决方案 [Google 的 Protobuf]

Protobuf

Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用remote procedure call ] 数据交换格式

目前很多公司http+json--->tcp+protobuf

语言指南https://developers.google.com/protocol-buffers/docs/proto

image-20201122163237739

image-20201122163359366

Protobuf 是以message的方式来管理数据的.

支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等),高性能,高可靠性。

使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。(考察)

然后通过protoc.exe编译器根据.proto 自动生成.java 文件

image-20201122163714093

单对象案例

客户端可以发送一个Student PoJo 对象到服务器 (通过 Protobuf 编码)

服务端能接收Student PoJo 对象,并显示信息(通过 Protobuf 解码

环境搭建

首先Idea下载插件Protocol Buffer Editor

image-20201122163823243

下载protochttps://github.com/protocolbuffers/protobuf/releases?after=v3.7.1解压

image-20201122165122886

image-20201122165222077

导入依赖

1
2
3
4
5
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>

代码编写

发送对象

Student.proto

image-20201122165332260

1
2
3
4
5
6
7
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}

写完之后将该文件复制到protoc的bin目录下(之所以这样做是没配置环境变量,MAC必须要配置)

image-20201122170235076

进入终端输protoc --java_out=. Student.proto

image-20201122170217001

得到.java文件

image-20201122170258203

复制到项目中,查看。

image-20201122170449899

image-20201122170557319

客服端

image-20201122171808790

socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());在pipeline中加入ProtoBufEncoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NettyClient {

public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();

//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtoBufEncoder
socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});

System.out.println("客户端 ok..");

//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}

image-20201122171956419

StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();通过内部类构造Student对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();

ctx.writeAndFlush(student);
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:"+ctx.channel().remoteAddress());

}

//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}

服务器

image-20201122172338592

socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));添加Protobuf解码器指定对Student对象解码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class NettyServer {
public static void main(String[] args) throws InterruptedException {

//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();

//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtobufDecoder 指定对Student对象解码
socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器

System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();

//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6688 成功");
} else {
System.out.println("监听端口 6688 失败");
}
}
});

//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

image-20201122172648621

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
//读取从客户端发发送端StudentPojo.Student

System.out.println("客户端发送端数据 id=" + msg.getId() + " 名字=" + msg.getName());
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

测试

启动服务器和客户端,接受成功。

image-20201122172811413

多对象案例

1)客户端可以随机发送Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)

2)服务端能接收Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)

image-20201122194132032

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3";//版本
option optimize_for = SPEED;//加快解析
option java_package = "com.kylin.netty.codec2";//指定生成到哪个包下
option java_outer_classname = "MyDataInfo";//外部类名


//protobuf 可以使用message 管理其他的message
message MyMessage{
//定义一个枚举类型
enum DataType{
StudentType = 0;//在proto3要求enum的编号从0开始
WorkerType = 1;
}
//用data_type来标识传的是哪一个枚举类型
DataType data_type = 1;

//表示每次枚举类型最多只能出现其中的一个,节省空间
oneof dataBody{
Student student = 2;
Worker worker = 3;
}

}

message Student{
int32 id = 1;//Student类的属性
string name = 2;
}
message Worker{
string name = 1;
int32 age = 2;
}

同样在bin目录下protoc --java_out=. Student.proto

image-20201122194443222

image-20201122194456289

复制到项目目录中并打开。

image-20201122195703951

客户端

image-20201122195821463

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class NettyClient {

public static void main(String[] args) throws InterruptedException {

//客户端需要一个事件循环组
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();

try {
//创建客户端启动对象Bootstrap 而不是服务端使用的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();

//设置相关参数
bootstrap.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtoBufEncoder
socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
}
});

System.out.println("客户端 ok..");

//启动客户端去连接服务端
//关于ChannelFuture要分析,涉及到netty到异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}

image-20201122195919821

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机发送Student或Worker对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage message = null;
if (0 == random) {//发送student对象
message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType)
.setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build())
.build();
} else {
message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
.setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("法外狂徒 张三").build())
.build();
}

ctx.writeAndFlush(message);
}

//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());

}

//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel();
}
}

服务器

image-20201122200204712

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class NettyServer {
public static void main(String[] args) throws InterruptedException {

//创建BoosGroup和WorkerGroup
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);

try {
//创建服务器端启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();

//使用链式编程链进行设置
bootstrap.group(boosGroup, workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128)//设置线程队列等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
//.handler(null)//该handler对应bossGroup,childHandler对应workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//在pipeline中加入ProtobufDecoder
socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});//给我们的workerGroup的EventLoop对应的管道设置处理器

System.out.println("......服务器 is ready...");
//启动服务器并绑定一个端口并且同步,生成了一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6688).sync();

//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6688 成功");
} else {
System.out.println("监听端口 6688 失败");
}
}
});

//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

image-20201122200342707

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//读取从客户端发送的数据
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
//发送的是学生数据
if (dataType == MyDataInfo.MyMessage.DataType.StudentType) {
System.out.println("[学生] " + "ID=" + msg.getStudent().getId() + " Name=" + msg.getStudent().getName());
} else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) {//发送的是工人数据
System.out.println("[工人] " + "Age=" + msg.getWorker().getAge() + " Name=" + msg.getWorker().getName());
}
}

//数据读取完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//write()+Flush(),将数据写入到缓存并刷新 对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

//处理异常,一般是需要关闭通道

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

测试

运行一个服务器运行多个客户端查看结果

image-20201122200518261

测试成功~