编码和解码的基本介绍
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码。

codec(编解码器) 的组成部分有两个:
- decoder(解码器)和 encoder(编码器)。
- encoder 负责把业务数据转换成字节码数据
- decoder 负责把字节码数据转换成业务数据
Netty 自身提供了一些 codec(编解码器)
Netty 提供的编码器
StringEncoder,对字符串数据进行编码
ObjectEncoder,对 Java 对象进行编码
……
Netty 提供的解码器
StringDecoder, 对字符串数据进行解码
ObjectDecoder,对 Java 对象进行解码
……
但是Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题
- 无法跨语言
- 序列化后的体积太大,是二进制编码的 5 倍多。
- 序列化性能太低
引出 新的解决方案 [Google 的 Protobuf
]
Protobuf
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用remote procedure call ] 数据交换格式
目前很多公司http+json--->tcp+protobuf
语言指南https://developers.google.com/protocol-buffers/docs/proto


Protobuf 是以message
的方式来管理数据的.
支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等),高性能,高可靠性。
使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。(考察)
然后通过protoc.exe
编译器根据.proto 自动生成.java 文件

单对象案例
客户端可以发送一个Student PoJo 对象到服务器 (通过 Protobuf 编码)
服务端能接收Student PoJo 对象,并显示信息(通过 Protobuf 解码
环境搭建
首先Idea下载插件Protocol Buffer Editor

下载protochttps://github.com/protocolbuffers/protobuf/releases?after=v3.7.1解压


导入依赖
1 2 3 4 5
| <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency>
|
代码编写
发送对象
Student.proto

1 2 3 4 5 6 7
| syntax = "proto3"; option java_outer_classname = "StudentPOJO";
message Student { int32 id = 1; string name = 2; }
|
写完之后将该文件复制到protoc的bin目录下(之所以这样做是没配置环境变量,MAC必须要配置)

进入终端输protoc --java_out=. Student.proto

得到.java文件

复制到项目中,查看。


客服端

socketChannel.pipeline().addLast("encoder", new ProtobufEncoder());
在pipeline中加入ProtoBufEncoder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class NettyClient {
public static void main(String[] args) throws InterruptedException { NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("encoder", new ProtobufEncoder()); socketChannel.pipeline().addLast(new NettyClientHandler()); } });
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventExecutors.shutdownGracefully(); } } }
|

StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();
通过内部类构造Student对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("豹子头 林冲").build();
ctx.writeAndFlush(student); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:"+ctx.channel().remoteAddress());
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel(); } }
|
服务器

socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
添加Protobuf解码器指定对Student对象解码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class NettyServer { public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance())); socketChannel.pipeline().addLast(new NettyServerHandler()); } });
System.out.println("......服务器 is ready..."); ChannelFuture cf = bootstrap.bind(6688).sync();
cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6688 成功"); } else { System.out.println("监听端口 6688 失败"); } } });
cf.channel().closeFuture().sync(); } finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
@Override protected void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
System.out.println("客户端发送端数据 id=" + msg.getId() + " 名字=" + msg.getName()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
测试
启动服务器和客户端,接受成功。

多对象案例
1)客户端可以随机发送Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)
2)服务端能接收Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| syntax = "proto3"; option optimize_for = SPEED; option java_package = "com.kylin.netty.codec2"; option java_outer_classname = "MyDataInfo";
message MyMessage{ enum DataType{ StudentType = 0; WorkerType = 1; } DataType data_type = 1;
oneof dataBody{ Student student = 2; Worker worker = 3; }
}
message Student{ int32 id = 1; string name = 2; } message Worker{ string name = 1; int32 age = 2; }
|
同样在bin目录下protoc --java_out=. Student.proto


复制到项目目录中并打开。

客户端

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("encoder", new ProtobufEncoder()); socketChannel.pipeline().addLast(new NettyClientHandler()); } });
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6688).sync(); channelFuture.channel().closeFuture().sync(); } finally { eventExecutors.shutdownGracefully(); } } }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { int random = new Random().nextInt(3); MyDataInfo.MyMessage message = null; if (0 == random) { message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType) .setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()) .build(); } else { message = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType) .setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("法外狂徒 张三").build()) .build(); }
ctx.writeAndFlush(message); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel(); } }
|
服务器

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class NettyServer { public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boosGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance())); socketChannel.pipeline().addLast(new NettyServerHandler()); } });
System.out.println("......服务器 is ready..."); ChannelFuture cf = bootstrap.bind(6688).sync();
cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6688 成功"); } else { System.out.println("监听端口 6688 失败"); } } });
cf.channel().closeFuture().sync(); } finally { boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if (dataType == MyDataInfo.MyMessage.DataType.StudentType) { System.out.println("[学生] " + "ID=" + msg.getStudent().getId() + " Name=" + msg.getStudent().getName()); } else if (dataType == MyDataInfo.MyMessage.DataType.WorkerType) { System.out.println("[工人] " + "Age=" + msg.getWorker().getAge() + " Name=" + msg.getWorker().getName()); } }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
测试
运行一个服务器运行多个客户端查看结果

测试成功~