Reactor模式

作者: 日期:2019-09-11

doug lea 在scalable io in java中分别描述了单凯发888国际娱乐网线程的reactor,多线程模式的reactor以及多reactor线程模式。

单线程的reactor,主要依赖java nio中的channel,buffer,selector,selectionkey。在单线程reactor模式中,不仅i/o操作在该reactor线程上,连非i/o的业务操作也在该线程上进行处理了,这可能会大大延迟i/o请求的响应

img

在多线程reactor中添加了一个工作线程池,将非i/o操作从reactor线程中移出转交给工作者线程池来执行。这样能够提高reactor线程的i/o响应,不至于因为一些耗时的业务逻辑而延迟对后面i/o请求的处理,但是所有的i/o操作依旧由一个reactor来完成,包括i/o的accept、read、write以及connect操作

img

多reactor线程模式将 接受客户端的连接请求 和 与该客户端的通信 分在了两个reactor线程来完成。mainreactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subreactor线程来完成与客户端的通信,这样一来就不会因为read数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subreactor线程池来将海量的连接分发给多个subreactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量

img

代码示例:

// nio selector 多路复用reactor线程模型
public class nioreactor {
 // 处理业务操作的线程池
 private static executorservice workpool = executors.newcachedthreadpool;
 // 封装了selector.select等事件轮询的代码
 abstract class reactorthread extends thread {
 selector selector;
 linkedblockingqueue runnable taskqueue = new linkedblockingqueue ;
 volatile boolean running = false;
 private reactorthread throws ioexception {
 selector = selector.open;
 // selector监听到有事件后,调用这个方法
 public abstract void handler throws exception;
 @override
 public void run {
 // 轮询selector事件
 while  {
 try {
 // 执行队列中的任务
 runnable task;
 while ) != null) {
 task.run;
 selector.select;
 // 获取查询结果
 set selectionkey selectionkeys = selector.selectedkeys;
 // 遍历查询结果
 iterator selectionkey keyiterator = selectionkeys.iterator;
 while ) {
 // 被封装的查询结果
 selectionkey selectionkey = keyiterator.next;
 keyiterator.remove;
 int readyops = selectionkey.readyops;
 // 关注 read 和 accept两个事件
 if ) != 0
 || readyops == 0) {
 try {
 selectablechannel channel =  selectionkey.attachment;
 channel.configureblocking;
 handler;
 // 如果关闭了,就取消这个key的订阅
 if ) {
 selectionkey.cancel;
 } catch  {
 // 如果有异常,就取消这个key的订阅
 selectionkey.cancel;
 e.printstacktrace;
 } catch  {
 e.printstacktrace;
 private selectionkey register throws exception {
 // 为什么register要以任务提交的形式,让reactor线程去处理?
 // 因为线程在执行channel注册到selector的过程中,会和调用selector.select方法的线程争用同一把锁
 // 而select方法实在eventloop中通过while循环调用的,争抢的可能性很高,
 // 为了让register能更快的执行,就放到同一个线程来处理
 futuretask selectionkey futuretask =
 new futuretask  - channel.register);
 taskqueue.add;
 return futuretask.get;
 private void dostart {
 if  {
 running = true;
 start;
 private serversocketchannel serversocketchannel;
 // 1、创建多个线程 - accept处理reactor线程 
 private reactorthread[] mainreactorthreads = new reactorthread[1];
 // 2、创建多个线程 - io处理reactor线程 
 private reactorthread[] subreactorthreads = new reactorthread[8];
 // 初始化线程组
 private void newgroup throws ioexception {
 // 创建mainreactor线程, 只负责处理serversocketchannel
 for  {
 mainreactorthreads[i] =
 new reactorthread {
 atomicinteger incr = new atomicinteger;
 @override
 public void handler throws exception {
 // 只做请求分发,不做具体的数据读取
 serversocketchannel ch =  channel;
 socketchannel socketchannel = ch.accept;
 socketchannel.configureblocking;
 // 收到连接建立的通知之后,分发给i/o线程继续去读取数据
 int index = incr.getandincrement % subreactorthreads.length;
 reactorthread workeventloop = subreactorthreads[index];
 workeventloop.dostart;
 selectionkey selectionkey = workeventloop.register;
 selectionkey.interestops;
 system.out.println.getname + "收到新连接 : " + socketchannel.getremoteaddress);
 // 创建io线程,负责处理客户端连接以后socketchannel的io读写
 for  {
 subreactorthreads[i] =
 new reactorthread {
 @override
 public void handler throws exception {
 // work线程只负责处理io处理,不处理accept事件
 socketchannel ch =  channel;
 bytebuffer requestbuffer = bytebuffer.allocate;
 while  ch.read != -1) {
 // 长连接情况下,需要手动判断数据有没有读取结束 
 if  0) break;
 if  == 0) return; // 如果没数据了, 则不继续后面的处理
 requestbuffer.flip;
 byte[] content = new byte[requestbuffer.limit];
 requestbuffer.get;
 system.out.println);
 system.out.println.getname + "收到数据,来自:" + ch.getremoteaddress);
 // todo 业务操作 数据库、接口...
 workpool.submit - {});
 // 响应结果 200
 string response =
 "http/1.1 200 ok\r
" + "content-length: 11\r
\r
" + "hello world";
 bytebuffer buffer = bytebuffer.wrap);
 while ) {
 ch.write;
 // 始化channel,并且绑定一个eventloop线程
 private void initandregister throws exception {
 // 1、 创建serversocketchannel
 serversocketchannel = serversocketchannel.open;
 serversocketchannel.configureblocking;
 // 2、 将serversocketchannel注册到selector
 int index = new random.nextint;
 mainreactorthreads[index].dostart;
 selectionkey selectionkey = mainreactorthreads[index].register;
 selectionkey.interestops;
 // 绑定端口
 private void bind throws ioexception {
 // 1、 正式绑定端口,对外服务
 serversocketchannel.bind);
 system.out.println;
 public static void main throws exception {
 nioreactor nioreactor = new nioreactor;
 // 1、 创建main和sub两组线程
 nioreactor.newgroup;
 // 2、 创建serversocketchannel,注册到mainreactor线程上的selector上
 nioreactor.initandregister;
 // 3、 为serversocketchannel绑定端口
 nioreactor.bind;
首页
电话
短信
联系