实例要求
- 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
- 实现多人群聊
- 服务器端:可以监测用户上线,离线,并实现消息转发功能
- 客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
目的:进一步理解Netty非阻塞网络编程机制
服务端
编写GroupChatServer类

GroupChatServerHandler

private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class GroupChatServer { private int port;
public GroupChatServer(int port) { this.port = port; }
public void run() throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(8); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new GroupChatServerHandler()); } });
System.out.println("Netty 服务器启动"); ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new GroupChatServer(7000).run(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new Date()) + "\n"); channelGroup.add(channel); }
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n"); System.out.println("当前channelGroup大小:" + channelGroup.size()); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 上线了~"+sdf.format(new Date()) + "\n"); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 离线了~"+sdf.format(new Date()) + "\n"); }
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch -> { if (channel != ch) { ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息:" + msg + sdf.format(new Date()) + "\n"); } else { ch.writeAndFlush("[自己]" + " 发送了消息:" + msg + "\n"); } }); } }
|
客户端
GroupChatClient


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class GroupChatClient {
private final String host; private final int port;
public GroupChatClient(String host, int port) { this.host = host; this.port = port; }
public void run() throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new GroupChatClientHandler()); } });
ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); Channel channel = channelFuture.channel(); System.out.println("--------" + channel.localAddress() + "--------"); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String msg = scanner.nextLine(); channel.writeAndFlush(msg+"\r\n"); } } finally { group.shutdownGracefully(); }
}
public static void main(String[] args) throws InterruptedException { new GroupChatClient("127.0.0.1",7000).run(); } }
|
1 2 3 4 5 6 7 8
| public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); } }
|
测试
