在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。
Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。
具备如下优点
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先讲解下各个线程模式, 最后看看 Netty 线程模型有什么优越性。目前存在的线程模型有:
传统阻塞 I/O 服务模型
Reactor 模型
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作。
存在问题
Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键.
优点
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点
单 Reactor多线程
优点
可以充分的利用多核 cpu 的处理能力
缺点
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
主从 Reactor 多线程
优点
缺点
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
简单版Netty模型
进阶版Netty模型
详细版Netty模型
Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
每个 BossNioEventLoop 中循环执行以下三个步骤
每个 WorkerNioEventLoop 中循环执行以下三个步骤
在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。API 关系如下图所示
Netty开发中需要自定义一个 Handler 类去实现 ChannelHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链.
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行
这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对
ChannelHandler 进行调用。常用方法如下所示
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:
ChannelOption.SO_BACKLOG
对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
ChannelOption.SO_KEEPALIVE
一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。常用方法如下所示:
常用方法如下所示:
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如: BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
一般情况下我们都是用实现类NioEventLoopGroup.
常用方法如下所示:
ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:
这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标
io.netty netty-all 4.1.42.Final
服务端实现步骤
代码实现
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** Netty服务端*/
public class NettyServer {public static void main(String[] args) throws InterruptedException {//1. 创建bossGroup线程组: 处理网络事件--连接事件EventLoopGroup bossGroup = new NioEventLoopGroup(1);//2. 创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数EventLoopGroup workerGroup = new NioEventLoopGroup();//3. 创建服务端启动助手ServerBootstrap serverBootstrap = new ServerBootstrap();//4. 设置bossGroup线程组和workerGroup线程组serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) //5. 设置服务端通道实现为NIO.option(ChannelOption.SO_BACKLOG, 128)//6. 参数设置.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6. 参数设置.childHandler(new ChannelInitializer() { //7. 创建一个通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//8. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new NettyServerHandler());}});//9. 启动服务端并绑定端口,同时将异步改为同步ChannelFuture future = serverBootstrap.bind(9999);future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("端口绑定成功!");} else {System.out.println("端口绑定失败!");}}});System.out.println("服务端启动成功.");//10. 关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池future.channel().closeFuture().sync();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}
}
自定义服务端handle
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;/*** 自定义处理Handler*/
public class NettyServerHandler implements ChannelInboundHandler {/*** 通道读取事件** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}/*** 通道读取完毕事件** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端",CharsetUtil.UTF_8));//消息出站}/*** 通道异常事件** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
}
客户端实现步骤
代码实现
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;/*** 客户端*/
public class NettyClient {public static void main(String[] args) throws InterruptedException {//1. 创建线程组EventLoopGroup group = new NioEventLoopGroup();//2. 创建客户端启动助手Bootstrap bootstrap = new Bootstrap();//3. 设置线程组bootstrap.group(group).channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO.handler(new ChannelInitializer() { //5. 创建一个通道初始化对象@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//6. 向pipeline中添加自定义业务处理handlerch.pipeline().addLast(new NettyClientHandler());}});//7. 启动客户端,等待连接服务端,同时将异步改为同步ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();//8. 关闭通道和关闭连接池channelFuture.channel().closeFuture().sync();group.shutdownGracefully();}
}
自定义客户端handle
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;/*** 客户端处理类*/
public class NettyClientHandler implements ChannelInboundHandler {/*** 通道就绪事件** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端",CharsetUtil.UTF_8));future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("数据发送成功!");} else {System.out.println("数据发送失败!");}}});}/*** 通道读就绪事件** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
}
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)
表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。
常用方法有
给Future添加监听器,监听操作结果
代码实现
服务端异步接收连接
//9. 启动服务端并绑定端口, 异步接收连接ChannelFuture future = serverBootstrap.bind(9999);future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("端口绑定成功!");} else {System.out.println("端口绑定失败!");}}});
客户端异步写数据
ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端",CharsetUtil.UTF_8));future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("数据发送成功!");} else {System.out.println("数据发送失败!");}}});