前言

Java NIO(New Input/Output)是Java1.4引入的一种IO操作方式,相对于Java IO(BIO),它提供了更高效的操作IO的方式。

Java IO是面向流的方式,而且是单向的;而Java NIO是面向块的,借助通道可双向传输。

Java IO在调用read或write方式时,会阻塞;Java NIO在调用read或write时如果缓冲区没有数据可读或已经写满时,不会阻塞(即IO操作不阻塞)。

文章主要偏向于介绍网络操作的NIO。


NIO核心组件

Java NIO涉及一些核心的概念,包括通道(Channel)、缓冲区(Buffer)、选择器(Selector)。

对应的实现在java.nio包下面


通道(Channel)

通道可以理解成IO操作的一个纽带,通过一个打开的连接链到一个实体,这个实体可以是软件磁盘、文件、网络Socket、执行IO读写操作的程序组件。

通道是双向的,通常也是线程安全的,常见的Channel有:
FileChannel:从文件中读取数据;
DatagramChannel:通过UDP读取网络中的数据;
SocketChannel:通过TCP读取网络中的数据;
ServerSocketChannel:监听新进入的连接,为每个连接创建SocketChannel。

关于SocketChannel,可以同时读和写操作,但是任何时候只有1个线程在读和1个线程在写。可以这么理解,1个线程在读的时候,另一个线程需要等待该通道的读操作;写类似。
核心api:

  • open():创建一个尚未被连接的SocketChannel;
  • connect():连接到一个Socket地址;
  • finishConnect():完成连接,与connect相互同步,意思是调用finishConnect()会阻塞到connect()完成为止;read/write方法也与这两个连接方法同步互斥,确保数据安全。

缓冲区(Buffer)

缓冲区,应用程序发给通道Channel的所有数据需要放到Buffer中,从通道中读取数据需要先读到Buffer中。Buffer是线程不安全的。
Buffer的作用如下图所示:
image-1699975044380

缓冲区其实是一个数组,这个数组可以是内存数组,也可以是直接内存数组(堆外内存)。默认使用allocate()方法分配的是HeapByteBuffer。

缓冲区定义了7种原始数据类型的Buffer类,它们都是Abstract类型,如ByteBuffer、LongBuffer、BooleanBuffer等。其中Byte是1个字节长度,数据类型中最小的单元,ByteBuffer类中定了如获取int、long长度数据的方法。

缓冲区的状态变量包括mark、position、limit、capacity
(mark <= position <= limit <= capacity)

  • mark:标记,备忘位置,调用mark方法可以使其等于position,记录当前的position的位置。(默认-1)
  • position:当前已经读写的字节数。(默认0)
  • limit:最大可读写的字节数。(默认等于capacity)
  • capacity:容量大小,不可变。

假设有一个容量8个字节的缓冲区:
1、如果是写入Buffer
a、初始值:position=0(指向第一个字节);limit=capacity=8;

此时position表示已经写的字节数,limit表示可写的字节数

b、然后写入2个字节:position=2(指向第三个字节),limit=capacity=8;

2、如果是读取Buffer
c、基于1,调用flip()翻转Buffer信息,此时,Buffer变成可读取。此时:position=0(指向第一个字节);limit=2(指向第三个字节),capacity=8;

此时position表示已经读的字节数,limit表示可读的字节数

d、然后读取1个字节后,position=1(指向第二个字节);limit=2(指向第三个字节),capacity=8;

过程如下图所示:
image-1700022913033


选择器(Selector)

NIO实现或提供了IO多路复用Reactor模型的基础能力,一个线程可以用一个选择器Selector通过轮询的方式监听多个通道Channel上的IO事件。

通过配置监听的Channel为非阻塞的,那么到IO事件未到达时,selector不会阻塞在某个Channel上,而是轮询其它的Channel,找到已经到达IO事件的Channel进行处理,但是找的过程对应用程序来说还是阻塞的。

Selector的核心方法select,监听就绪的事件,返回对应SelectionKey个数。找到后通过SelectionKey集合匹配已经就绪的Channel进行处理。select方法的底层实现,取决于操作系统来执行select、poll、epoll等系统调用。

在将通道Channel注册到选择器Selector上的时候,需要注册具体感兴趣的事件,包括以下几类:

  • SelectionKey.OP_CONNECT = 1 << 3 = 8
  • SelectionKey.OP_ACCEPT = 1 << 4 = 16
  • SelectionKey.OP_READ = 1 << 0 = 1
  • SelectionKey.OP_WRITE = 1 << 2 = 4

粘合剂(SelectionKey)

在将通道注册到选择器上的时候,会创建SelectionKey实例,并标记感兴趣的事件集合。SelectionKey表示选择器(Seletor)与网络通道(Channel)之间的令牌,也即粘合剂。

SelectionKey的主要作用包括:

  • 兴趣集合:通道在注册到选择器时,配置感兴趣的事件,如SelectionKey.OP_READ,可以多个,用 | 拼接即可,最后创建一个SelectionKey实例;
  • 准备就绪集合:通道有可操作事件的时候,通过SelectionKey可以跟踪对应事件;
  • 附件:SelectionKey允许附加对象,做额外的记录
  • 与选择器交互:当选择器的select()方法被调用,选择器就会检查所有注册的通道是否有任何就绪的事件。如果有,选择器将会返回,并且相应的SelectionKey将被添加到选择器的已就绪键集合中,以表示它的通道已经准备好进行某些操作了;
  • 事件处理:程序可以通过检查每个就绪的SelectionKey来决定如何响应,如接受新连接、读取数据、写入数据等。

从构造器可以看出来,其组合了channel和selector

// sun.nio.ch.SelectionKeyImpl
// 感兴趣事件,如果是多个用 | 计算所得
private volatile int interestOps;
// 就绪事件
private int readyOps;
SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {
		// 通道
        this.channel = var1;
        // 选择器
        this.selector = var2;
}

类图

Channel、Selector、SelectorProvider 类图如下:

image-1700628180965


示例

服务端

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

/**
 * nio服务端
 */
@Slf4j
public class NIOServer {

    public static void main(String[] args) throws IOException {

        Selector selector = Selector.open();

        // server的socketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 注册selector,对OP_ACCEPT事件感兴趣
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // serverSocket
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress("127.0.0.1", 8888));

        while (true) {
            // 阻塞获取事件
            int select = selector.select();
            log.info("select count:{}", select);
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 连接事件
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel ssChannel = (ServerSocketChannel)selectionKey.channel();

                    // 服务器会为每个新连接创建一个 SocketChannel
                    SocketChannel sChannel = ssChannel.accept();
                    sChannel.configureBlocking(false);

                    // 这个新连接主要用于从客户端读取数据,对OP_READ事件感兴趣
                    sChannel.register(selector, SelectionKey.OP_READ);
                    log.info("acceptChannel:{} hashCode:{}", sChannel, sChannel.hashCode());
                } else if (selectionKey.isReadable()) {
                    // 可读
                    SocketChannel sChannel = (SocketChannel) selectionKey.channel();
                    log.info("readableChannel:{} hashCode:{}", sChannel, sChannel.hashCode());
                    String msg = readDataFromSocketChannel(sChannel);
                    log.info("msg:{}", msg);
                    // 不能close,否则无法再收到数据
                    // sChannel.close();
                }
                // 重要,这里移除的是selector内部的集合中的SelectionKey实例,对应已经处理过的io事件。
                // 不移除的话,如果此次的SelectionKey出现,与之前的SelectionKey集合不全相同,会出错;但如果是全部相同的SelectionKey再次出现,会判重相等而不会报错
                iterator.remove();
            }
        }
    }

    /**
     * 读取通道的数据
     * 正常需要数据解码,不然客户端发送多次,被一次读取完
     *
     * @param sChannel:socket通道
     */
    private static String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder sb = new StringBuilder();

        buffer.clear();

        int read = sChannel.read(buffer);
        if (read <= 0) {
            return "";
        }

        // 读取buffer
        buffer.flip();
        // 可读大小
        int limit = buffer.limit();
        char[] dst = new char[limit];
        for (int i= 0; i < limit; i++) {
            dst[i] = (char)buffer.get(i);
        }
        sb.append(dst);
        buffer.clear();

        return sb.toString();
    }

}

客户端

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 升级版客户端
 */
@Slf4j
public class NIOClient2 {

    // 发送缓冲区
    private static ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    // 接收缓冲区
    private static ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);

    public static void main(String[] args) throws IOException {
        // 打开选择器
        Selector selector = Selector.open();

        // 打开Socket通道
        SocketChannel socketChannel = SocketChannel.open();
        log.info("socketChannel:{} hashCode:{}", socketChannel, socketChannel.hashCode());
        // 非阻塞方式
        socketChannel.configureBlocking(false);
        // 注册连接事件,SelectionKey表示Selector和Channel一个关系,充当粘合剂的作用。对OP_CONNECT事件感兴趣
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        // 发起连接 -> 非阻塞
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));

        while (true) {
            // 阻塞到有IO事件
            selector.select();

            // 已就绪键值,能找到对应的Channel
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isConnectable()) {
                    // 可连接
                    System.out.println("client触发连接事件");
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    log.info("socketChannel:{} connect hashCode:{}", socketChannel, socketChannel.hashCode());
                    // 是否正在进行连接操作
                    if (clientChannel.isConnectionPending()) {
                        clientChannel.finishConnect();
                        System.out.println("client完成连接事件");
                    }
                    // 注册读事件,对OP_READ和OP_WRITE事件感兴趣, 不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件
                    clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    // 新线程监听控制台
                    new Thread(() -> consultInput(clientChannel)).start();
                } else if (key.isReadable()) {
                    // 可读
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    receiveBuffer.clear();
                    // 读取数据
                    int count = clientChannel.read(receiveBuffer);
                    if(count > 0){
                        String receiveText = new String(receiveBuffer.array(), 0, count);
                        System.out.println("客户端接受服务器端数据--:" + receiveText);
                    }
                } else if (key.isWritable()) {
                    // 可写
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    sendBuffer.clear();
                    String sendText = "write server";
                    sendBuffer.put(sendText.getBytes(StandardCharsets.UTF_8));
                    sendBuffer.flip();
                    clientChannel.write(sendBuffer);
                    System.out.println("客户端发送服务器端数据--:" + sendText);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
            }
            keys.clear();
        }


    }

    /**
     * 监听控制台的输入
     *
     * @param clientChannel
     */
    private static void consultInput(SocketChannel clientChannel) {
        // 通过consult获取输入
        Scanner in = new Scanner(System.in);
        while(true){
            if(in.hasNextLine()) {
                String line = in.nextLine();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.clear();
                buffer.put(line.getBytes(StandardCharsets.UTF_8));
                buffer.flip();
                try {
                    clientChannel.write(buffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

结果(先启动服务端,再启动客户端):

客户端部分提示:客户端发送服务器端数据–:write server
服务端部分提示:msg:write server

使用注意
1、处理完的SelectionKey须移除,防止某些情况重复处理(见服务端代码注释,见iterator.remove());
2、如果同时对多个事件感兴趣,可以这么写:SelectionKey.OP_READ | SelectionKey.OP_WRITE
3、不能一直对OP_WRITE事件感兴趣,不然buffer有空间可写时会一直触发写事件


总结

NIO中核心的几个组件,包括Channel、Buffer、Selector、SelectionKey。他们都在java.nio包下面。

选择器Selector的出现,可以使得1个线程监听多个客户端的通道,感知每个通道上不同的IO事件,并通过SelectionKey来关联到对应的Channle进行处理,无疑是借助了内核的实现。