博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty线程模型
阅读量:6290 次
发布时间:2019-06-22

本文共 8089 字,大约阅读时间需要 26 分钟。

前言

Netty是一个高性能、异步事件驱动的NIO框架,提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty构建,比如RPC框架、zookeeper等。

那么,Netty性能为啥这么高?主要是因为其内部Reactor模型的实现。

Reactor模型

Netty中的Reactor模型主要由多路复用器(Acceptor)、事件分发器(Dispatcher)、事件处理器(Handler)组成,可以分为三种。

1、单线程模型:所有I/O操作都由同一个线程完成,即多路复用、事件分发和处理都是在一个Reactor线程上完成的。

Reactor单线程模型示意图如下所示:

 

由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处理所有IO相关的操作。从架构层面看,一个NIO线程确实可以完成其承担的职责。例如,通过Acceptor类接收客户端的TCP连接请求消息,链路建立成功之后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上进行消息解码。用户线程可以通过消息编码通过NIO线程将消息发送给客户端。

对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下:

1)一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;

2)当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

3)可靠性问题:一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。

2、多线程模型:为了解决单线程模型存在的一些问题,演化而来的Reactor线程模型。

Rector多线程模型与单线程模型最大的区别就是有一组NIO线程处理IO操作,它的原理图如下:

Reactor多线程模型的特点:

1)有专门一个NIO线程-Acceptor线程用于监听服务端,接收客户端的TCP连接请求;

2)网络IO操作-读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;

3)1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

 3、主从多线程模型:采用多个reactor,每个reactor都在自己单独的线程里执行。如果是多核,则可以同时响应多个客户端的请求,一旦链路建立成功就将链路注册到负责I/O读写的SubReactor线程池上。

主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(sub reactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的IO线程上,由IO线程负责后续的IO操作。

它的线程模型如下图所示:

 

 

利用主从NIO线程模型,可以解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题。

它的工作流程总结如下:

  1. 从主线程池中随机选择一个Reactor线程作为Acceptor线程,用于绑定监听端口,接收客户端连接;
  2. Acceptor线程接收客户端连接请求之后创建新的SocketChannel,将其注册到主线程池的其它Reactor线程上,由其负责接入认证、IP黑白名单过滤、握手等操作;
  3. 步骤2完成之后,业务层的链路正式建立,将SocketChannel从主线程池的Reactor线程的多路复用器上摘除,重新注册到Sub线程池的线程上,用于处理I/O的读写操作。

示例代码

以下是server和client的示例代码,其中使用的是 Netty 4.x,先看看如何实现,后续会针对各个模块进行深入分析。

server 代码实现

public class EchoServer {    private final int port;    public EchoServer(int port) {        this.port = port;    }    public void run() throws Exception {        // Configure the server.        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)        EventLoopGroup workerGroup = new NioEventLoopGroup();          try {            ServerBootstrap b = new ServerBootstrap(); // (2)            b.group(bossGroup, workerGroup)             .channel(NioServerSocketChannel.class) // (3)             .option(ChannelOption.SO_BACKLOG, 100)             .handler(new LoggingHandler(LogLevel.INFO))             .childHandler(new ChannelInitializer
() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(port).sync(); // (5) // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new EchoServer(port).run(); }}

EchoServerHandler 实现

public class EchoServerHandler extends ChannelInboundHandlerAdapter {        private static final Logger logger = Logger.getLogger(              EchoServerHandler.class.getName());        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ctx.write(msg);      }        @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {          ctx.flush();      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          // Close the connection when an exception is raised.          logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);          ctx.close();      }  }

1、NioEventLoopGroup 是用来处理I/O操作的线程池,Netty对 EventLoopGroup 接口针对不同的传输协议提供了不同的实现。在本例子中,需要实例化两个NioEventLoopGroup,通常第一个称为“boss”,用来accept客户端连接,另一个称为“worker”,处理客户端数据的读写操作。

2、ServerBootstrap 是启动服务的辅助类,有关socket的参数可以通过ServerBootstrap进行设置。
3、这里指定NioServerSocketChannel类初始化channel用来接受客户端请求。
4、通常会为新SocketChannel通过添加一些handler,来设置ChannelPipeline。ChannelInitializer 是一个特殊的handler,其中initChannel方法可以为SocketChannel 的pipeline添加指定handler。
5、通过绑定端口8080,就可以对外提供服务了。

client 代码实现

public class EchoClient {        private final String host;      private final int port;      private final int firstMessageSize;        public EchoClient(String host, int port, int firstMessageSize) {          this.host = host;          this.port = port;          this.firstMessageSize = firstMessageSize;      }        public void run() throws Exception {          // Configure the client.          EventLoopGroup group = new NioEventLoopGroup();          try {              Bootstrap b = new Bootstrap();              b.group(group)               .channel(NioSocketChannel.class)               .option(ChannelOption.TCP_NODELAY, true)               .handler(new ChannelInitializer
() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( //new LoggingHandler(LogLevel.INFO), new EchoClientHandler(firstMessageSize)); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { final String host = args[0]; final int port = Integer.parseInt(args[1]); final int firstMessageSize; if (args.length == 3) { firstMessageSize = Integer.parseInt(args[2]); } else { firstMessageSize = 256; } new EchoClient(host, port, firstMessageSize).run(); } }

EchoClientHandler 实现

public class EchoClientHandler extends ChannelInboundHandlerAdapter {        private static final Logger logger = Logger.getLogger(              EchoClientHandler.class.getName());        private final ByteBuf firstMessage;        /**      * Creates a client-side handler.      */      public EchoClientHandler(int firstMessageSize) {          if (firstMessageSize <= 0) {              throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);          }          firstMessage = Unpooled.buffer(firstMessageSize);          for (int i = 0; i < firstMessage.capacity(); i ++) {              firstMessage.writeByte((byte) i);          }      }        @Override      public void channelActive(ChannelHandlerContext ctx) {          ctx.writeAndFlush(firstMessage);      }        @Override      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {          ctx.write(msg);      }        @Override      public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {         ctx.flush();      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          // Close the connection when an exception is raised.          logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);          ctx.close();      }  } 

参考:

转载地址:http://sykta.baihongyu.com/

你可能感兴趣的文章
如何判断一条sql(update,delete)语句是否执行成功
查看>>
JSONObject.parseObject(jsonStr);和JSONObject.fromObject(jsonStr);
查看>>
【集训队作业2018】小Z的礼物
查看>>
ClientScriptManager与ScriptManager向客户端注册脚本的区别
查看>>
js和php中几种生成验证码的方式
查看>>
android UI进阶之仿iphone的tab效果1
查看>>
这是我的第1个C#程序(向控制台输出一句话)
查看>>
html
查看>>
Xqk.Data数据框架开发指南:丰富的、灵活的查询方法(第三部分:SqlField)
查看>>
Java基本语法
查看>>
MapReduce对交易日志进行排序的Demo(MR的二次排序)
查看>>
online-compiler 在线编译器
查看>>
9. Palindrome Number - Easy
查看>>
使用vs2017编译live555
查看>>
洛谷——P1347 排序
查看>>
uboot2009 nandflash移植
查看>>
gulp-usemin 插件使用
查看>>
int数据类型的最大数
查看>>
OI养老专题02:约瑟夫问题求幸存者
查看>>
Python多线程
查看>>