前言

学习了NIO(New Input/Output)的基础后,已经对IO的多路复用有了初步的了解。关于IO多路复用的技术还有哪些值得深入学习的,大家所熟知的Reactor网络编程模式便是使用IO多路复用技术。希望文章可以帮助你对Reactor模式的不同实现有个清晰的认识。


传统IO(阻塞/非阻塞)

先来看看传统的IO模型,对于一个服务器有多个线程,每个线程处理1个客户端请求,包括连接、读取数据、业务处理、发送数据整个过程。
image-1700376646153

可以看出服务能处理的并发数,很大程度受线程数的影响。

说明:

  • 简单, 并发量不大时没有问题。
  • 并发量大时,比较依赖于服务器的线程数;
  • 线程多了,线程创建和线程调度等会消耗更多的CPU和内存;
  • read和send操作属于IO操作,与process串行处理,相当于它们是相互阻塞的,降低响应速度;

IO多路复用-Reactor模式

基于传统的IO,是否可以以更低的资源来处理更多的请求。JDK1.4提供了一套非阻塞IO的API(API相关内容可以看另一篇文章,这里主要说网络IO),本质上是借助操作系统select、poll、epoll这些内核的东西实现了一套以事件驱动的IO多路复用的网络IO操作。

Reactor(反应堆)就是基于这套实现而提出的以事件驱动的IO模型

这里面有两个概念:IO多路复用 和 Reactor。
Reactor已经说了,字面意思是反应堆,对事件的“反应”或“响应”,是一个抽象概念定义。

IO多路复用是五种IO模型的一种,可以看之前的文章,简单理解就是通过一个或几个线程监听一批网络连接的IO事件(包括连接、读、写)。

Reactor模式由Reactor和资源处理两部分组成:

  • Reactor:负责事件监听与分发,包括连接、读写事件。
  • 资源处理:负责处理事件,如数据读取、业务处理、数据发送。

常见的Reactor模式

  • 单Reactor单线程
  • 单Reactor多线程
  • 多Reactor多线程

线程可以换成进程,介绍主要以线程为主

单Reactor单线程

单Reactor单线程模式如下:
image-1700381198208

在该模式下,包括三个角色:Reactor、Acceptor、Handler

  • Reactor:监听和分发所有的IO事件。
  • Acceptor:处理连接事件。
  • Handler:处理读写事件和业务逻辑。

其中的accept、read、send属于系统调用函数,dispatch和process分别是事件分发和业务逻辑处理。

大致过程
1、Reactor通过IO多路复用接口(如Selector的select函数)监听事件,收到事件后进行分发;
2、如果是连接事件交由Acceptor,Acceptor通过accept获取连接,并创建Handler进行后续读写事件的处理;
3、如果是其它事件交由对应的Handler处理,进行读取、业务处理、写出等整个过程。

该方案解决了传统IO线程资源问题,且以事件通知方式驱动。但是也有其缺点
1、在进行Acceptor或者Handler时,无法同时处理其它连接,会造成并发下响应延迟;
2、单个线程处理无法充分利用CPU的多核优势。

但是对于能够快速进行业务处理的场景,如Redis这种内存数据库,6.0版本前,使用这种模式,也能很快,瓶颈不在CPU上,好处是没有资源的竞争和上下文切换。

单Reactor多线程

基于单Reactor单线程,优化出单Reactor多线程模式,如下:
image-1700382744003

如图,将IO事件与业务处理分离,在处理IO事件的同时可以进行业务逻辑处理,充分利用CPU资源。

但连接和读写IO事件仍在一个线程中处理,面对高并发,仍为成为性能瓶颈。

多Reactor多线程

单Reactor变多Reactor,将Reactor拆分成两种(不一定是两个),MainReactor和SubReactor:
MainReactor:主线程的MainReactor使用IO多路复用选择器监听IO连接事件,将其交由Acceptor处理,Acceptor将连接交由某个子线程;
SubReactor:子线程中的SubReactor对新连接进行监听读写事件,并创建Handler处理后续的事件。

如下图所示:
image-1700576935705

说明:
1、这里的SubReactor也是由线程池处理或几个固定子线程,业务处理process在另一个线程池中处理。
2、注意SubReactor的线程数并不等于请求数,1个SubReactor可以监听多个SockctChannel的网络IO事件。
3、多Reactor多线程看起来复杂,其实相对简单,体现在 1)、两种Reactor分工明细;2)、交互简单,主Reactor只需要新连接给子线程即可,剩下的交给子线程跟进。

被大家所熟知的Netty和Memcache都是采用此模式。


示例

服务端

import cn.hutool.core.thread.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.*;

/**
 * 多Reactor多线程模式。
 *
 * 主Reactor负责处理连接
 * 从Reactor负责处理读写
 */
@Slf4j
public class MultiReactor {


    // 3个线程,1个主Reactor、2个从Reactor
    private static final int THREAD_POLL_SIZE = 3;
    static ExecutorService reactorThreadPool = new ThreadPoolExecutor(THREAD_POLL_SIZE, THREAD_POLL_SIZE,
            0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
            new NamedThreadFactory("ReactorThreadPoll-", false));

    int next = 0;

    // 主Reactor负责处理连接
    private Reactor mainReactor;
    // 从Reactor负责处理读写
    private Reactor[] subReactors = new Reactor[THREAD_POLL_SIZE - 1];

    // 监听端口
    private int port;

    public static void main(String[] args) throws IOException {
        MultiReactor multiReactor = new MultiReactor(8888);
        multiReactor.start();
    }

    /**
     * 分配线程
     */
    public MultiReactor(int port) throws IOException {
        this.port = port;
        mainReactor = new Reactor("mainReactor");
        for (int i = 0; i < subReactors.length; i++) {
            subReactors[i] = new Reactor("subReactor" + i);
        }
    }

    /**
     * 启动
     */
    private void start() throws IOException {
        // 主Reactor
        Thread mrThread = new Thread(mainReactor);
        //  创建ServerSocketChannel,注册selector,关注OP_ACCEPT事件
        new Acceptor(mainReactor.getSelector(), port);
        // reactor开始监听
        reactorThreadPool.execute(mrThread);

        // 从Reactor
        for (int i = 0; i < subReactors.length; i++) {
            Thread srThread = new Thread(subReactors[i]);
            reactorThreadPool.execute(srThread);
        }
    }


    /**
     * Reactor抽象,包括selector监听事件
     */
    static class Reactor implements Runnable {
        // 队列,主Reactor处理接收事件时,要在从Reactor注册读写事件,但是从Reactor当时正阻塞在 select() 方法上,
        // 所以借助队列,在达到先在select唤醒,及时注册,再select
        private ConcurrentLinkedQueue<SocketChannel> events = new ConcurrentLinkedQueue<>();

        // 用于识别当前reactor
        private String name;
        final Selector selector;
        public Reactor(String name) throws IOException {
            this.name = name;
            selector = Selector.open();
        }
        public Selector getSelector() {
            return selector;
        }

        @Override
        public void run() {
            // normally in a new Thread
            // 处理IO事件
            try {
                // 死循环
                while (!Thread.interrupted()) {
                    // 针对从Reactor,从select唤醒后,先尝试Register,不然内部有锁竞争
                    SocketChannel sChannel;
                    while ((sChannel = events.poll()) != null) {
                        // 初始化读写事件
                        new Handler(selector, sChannel);
                    }

                    // 阻塞,直到有通道事件就绪
                    log.info("{} 进入select监听", name);
                    selector.select();
                    // 拿到就绪通道 SelectionKey 的集合
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey skTmp = it.next();
                        // 根据 key 的事件类型进行分发
                        dispatch(skTmp);
                    }
                    // 清空就绪通道的 key
                    selected.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        /**
         * 分发
         */
        void dispatch(SelectionKey key) {
            Runnable r = (Runnable) (key.attachment()); // 拿到通道注册时附加的对象
            if (r != null) r.run();
        }

        /**
         * 注册IO事件
         */
        void register(SocketChannel socketChannel) throws ClosedChannelException {
            events.offer(socketChannel);
            selector.wakeup();
        }
    }

    /**
     * 处理连接类
     */
    class Acceptor implements Runnable {
        final ServerSocketChannel serverSocketChannel;

        /**
         * 初始化通道,注册选择器,关注连接事件
         */
        public Acceptor(Selector selector, int port) throws IOException {
            serverSocketChannel = ServerSocketChannel.open();

            // bind
            ServerSocket serverSocket = serverSocketChannel.socket();
            serverSocket.bind(new InetSocketAddress(port));
            // 非阻塞
            serverSocketChannel.configureBlocking(false);

            // 注册到选择器,关注连接器事件
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            key.attach(this);

            log.info("Acceptor- mainReactor- new Acceptor listen port:{}", port);
        }

        @Override
        public void run() {
            // 处理连接事件
            try {
                log.info("Acceptor- 处理连接");
                SocketChannel sChannel = serverSocketChannel.accept();
                sChannel.configureBlocking(false);

                // 这个新连接主要用于从客户端读取数据,对OP_READ事件感兴趣
                Reactor subReactor = subReactors[next];
                subReactor.register(sChannel);
                if(++next == subReactors.length){
                    next = 0;
                }
                log.info("Acceptor- 处理连接完成");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 业务处理类
     */
    static class Handler implements Runnable {
        final SelectionKey selectionKey;
        // 可以加线程池处理

        public Handler(Selector selector, SocketChannel socketChannel) throws ClosedChannelException {
            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
            // 管理事件的处理程序
            key.attach(this);
            this.selectionKey = key;
        }

        @Override
        public void run() {
            // 处理读写事件
            SocketChannel sChannel = (SocketChannel) selectionKey.channel();
            try {
                String msg = readDataFromSocketChannel(sChannel);
                log.info("Handler- msg:{}", msg);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        /**
         * 数据读取
         * @param sChannel
         * @return
         * @throws IOException
         */
        private String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
            ByteBuffer buffer = ByteBuffer.allocate(512);
            StringBuilder sb = new StringBuilder();

            buffer.clear();

            int read = sChannel.read(buffer);
            if (read <= 0) {
                return "";
            }

            // 读取buffer
            buffer.flip();
            // 可读大小
            int limit = buffer.limit();
            char[] dst = new char[limit];
            for (int i= 0; i < limit; i++) {
                dst[i] = (char)buffer.get(i);
            }
            sb.append(dst);
            buffer.clear();

            return sb.toString();
        }
    }
}

客户端

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 升级版客户端
 */
@Slf4j
public class NIOClient2 {

    // 发送缓冲区
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    // 接收缓冲区
    private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);

    public static void main(String[] args) throws IOException {
        // 打开选择器
        Selector selector = Selector.open();

        // 打开Socket通道
        SocketChannel socketChannel = SocketChannel.open();
        log.info("socketChannel:{} hashCode:{}", socketChannel, socketChannel.hashCode());
        // 非阻塞方式
        socketChannel.configureBlocking(false);
        // 注册连接事件,SelectionKey表示Selector和Channel一个关系,充当粘合剂的作用。对OP_CONNECT事件感兴趣
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 发起连接 -> 非阻塞
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));

        while (true) {
            // 阻塞到有IO事件
            selector.select();

            // 已就绪键值,能找到对应的Channel
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isConnectable()) {
                    // 可连接
                    System.out.println("client触发连接事件");
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    log.info("socketChannel:{} connect hashCode:{}", socketChannel, socketChannel.hashCode());
                    // 是否正在进行连接操作
                    if (clientChannel.isConnectionPending()) {
                        clientChannel.finishConnect();
                        System.out.println("client完成连接事件");
                    }
                    // 注册读事件,对OP_READ和OP_WRITE事件感兴趣, 不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件
                    clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    // 新线程监听控制台
                    new Thread(() -> consultInput(clientChannel)).start();
                } else if (key.isReadable()) {
                    // 可读
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    receiveBuffer.clear();
                    // 读取数据
                    int count = clientChannel.read(receiveBuffer);
                    if(count > 0){
                        String receiveText = new String(receiveBuffer.array(), 0, count);
                        System.out.println("客户端接受服务器端数据--:" + receiveText);
                    }
                } else if (key.isWritable()) {
                    // 可写
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    sendBuffer.clear();
                    String sendText = "write server";
                    sendBuffer.put(sendText.getBytes(StandardCharsets.UTF_8));
                    sendBuffer.flip();
                    clientChannel.write(sendBuffer);
                    System.out.println("客户端发送服务器端数据--:" + sendText);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
            }
            keys.clear();
        }


    }

    /**
     * 监听控制台的输入
     *
     * @param clientChannel
     */
    private static void consultInput(SocketChannel clientChannel) {
        // 通过consult获取输入
        Scanner in = new Scanner(System.in);
        while(true){
            if(in.hasNextLine()) {
                String line = in.nextLine();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.clear();
                buffer.put(line.getBytes(StandardCharsets.UTF_8));
                buffer.flip();
                try {
                    clientChannel.write(buffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

先启动服务端,再启动客户端,输出如下:
服务端:

Acceptor- mainReactor- new Acceptor listen port:8888
subReactor1 进入select监听
mainReactor 进入select监听
Acceptor- 处理连接
Acceptor- 处理连接完成
mainReactor 进入select监听
subReactor0 进入select监听
Handler- msg:write server
subReactor0 进入select监听

客户端:

socketChannel:java.nio.channels.SocketChannel[connection-pending remote=/127.0.0.1:8888] connect hashCode:353842779
client完成连接事件
客户端接受服务器端数据–:write server


扩展

Reactor与Proactor的区别:

  • Reactor是非阻塞同步网络模式,感知的是就绪可读写事件,仍需要应用进程进行阻塞式读写内核数据;
  • Proactor是异步网络模式,感知的是已完成的读写事件,数据已经被内核处理好了,通知应用进程,应用只需要操作应用内存数据。

这其实也是同步IO与异步IO的主要区别。

目前Window实现了真正的异步IO(IOCP技术),而Linux的aio系列不是真正操作系统级别的异步IO,是用户空间模拟出来的,且仅支持本地文件的异步IO,不支持Sockct网络IO,所以Linux下高性能网络程序都是基于Reactor。


总结

单Reactor单线程相对于传统IO处理,是一种分而治之的思想,但是它无法充分利用IO和CPU的资源;所以有了单Reactor多线程,将业务处理和IO处理进行了分离,将业务处理丢到线程池中处理,充分利用多核CPU,提高了效率;仍不能提高IO的速率,为了平衡IO的速率,使用多Reactor来饱和IO。

有场景有问题才会有优化手段;往往定义问题是解决服务问题的基础。分而治之通常是实现可伸缩性目标的最佳方法。

link:Scalable-IO-in-Java