THsHaServer
HsHa=HalfSync HalfAsync 半同步,半异步
在处理IO事件时是同步的,在执行invoke方法时是在线程池异步执行的。
本质上是一个添加了工作线程池的Reactor模型:
内部使用了一个自定义的线程池ExecutorService,用线程池中的线程执行FrameBuffer的Invoke方法,非常简单。上面已经说明invoke()方法只执行了服务端本地的接口实现类,并没有做IO操作,所以在THsHaServer中IO操作还是在SelectThread线程内完成的。
1 | //线程池 invoker |
下面是THsHaServer对线程池的创建和销毁代码:
Options的默认配置是5个线程,60s空闲时间。
1 | //创建线程池 |
TThreadSelectorServer
我使用的0.5.x版本没有这个server实现,在0.8.x版本thrift添加了TThreadedSelectorServer
据thrift描述:
在多核环境中,如果瓶颈是单线程的selector获得的CPU计算能力不足, 那么它的性能要优于TNonblockingServer/THsHaServer。
而且because the accept handling is decoupled from
reads/writes and invocation, the server has better ability to handle back-pressure from new connections
(backpress背压,是一个很有意思的名词,在很多地方有用到 比如RxJava 或者背压阀)
TThreadSelectorServer是Multiple Reactors模式的实现,
mainReactor只负责完成accept操作,子reactor处理读、写事件。图中只画出来一个subReactor线程的情况,实际可能配置多个线程运行subReactor。
非阻塞服务端其他实现/reactor-pool.png)
在实现Multiple Reactors时,TThreadSelectorServer的AcceptThread相当于MainReactor,SelectorThread相当于subReactor,下面只选择部分代码做介绍:
TThreadedSelectorServer.java
1 | //startThreads()方法,初始化AcceptThread和SelectorThread(集合): |
AcceptThread.java
run()方法里只执行select()操作:
1 | public void run() { |
在handleAccept()方法中,处理客户端的建立连接的请求:
AcceptThread有两种工作模式,FAST_ACCEPT是收到连接请求就接受,FAIR_ACCEPT是把连接请求丢到工作线程池invoker中,这样会等之前的连接请求被工作线程执行后,才会处理后来的连接请求:
1 | private void handleAccept() { |
doAddAccept()方法,把client连接交给SelectorThread处理,后续的其他操作就和AcceptThread无关了:
1 | private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { |
SelectorThread.java
addAcceptedConnection()方法
AcceptThread调用它把任务交给SelectorThread处理,客户端连接会被放入SelectorThread的acceptedQueue等待处理:
1 | public boolean addAcceptedConnection(TNonblockingTransport accepted) { |
在SelectorThread的run()方法
完成对读/写IO事件的select(),处理acceptQueue中的任务,执行processInterestChanges(之前介绍过,触发FrameBuffer的关注事件更新)
1 | while (!stopped_) { |
processAcceptedConnections()方法
从acceptQueue队列取出客户端连接,向selector注册OP_READ事件
1 | private void processAcceptedConnections() { |