Java IO 系列 | NIO-1.0一文速览

前言

上篇文章中,我们梳理了Java 经典IO,虽未详尽,但已基本满足日常知识储备需求。

本篇文章,我们顺着Java IO的发展史,梳理 NIO-1.0 的基础知识,显然,1.0体现了版本的概念,此时 NIO 应解释为 Java new IO 更加合理。

本篇将按照以下脉络展开,已经熟练掌握的章节,建议直接跳过

desc

NIO-1.0概述

在先前多线程系列中,我们提到JVM的线程非常贴近操作系统中的线程;在JAVA经典IO中,IO的阻塞会体现到JVM中的线程上。

因此,在高并发IO场景下,如果以"线程-IO一对一"的方式实现,开启、调度线程的资源消耗会演变为性能瓶颈。

在JAVA 1.4 的 NIO-1.0 中,对此情况进行了设计改进,使用:通道(Channel)+ 选择器(Selector)+ 缓冲区(Buffer) 形成组合拳。

Channel概述

经典IO中,立足于 数据的流动 的角度进行了抽象,提出了 Stream,在 NIO-1.0中,立足于 IO操作,对IO中的关键角色进行了抽象,Channel 是整个操作的中心。

JDK文档中这样描述:

A nexus for I/O operations.

A channel represents an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.

A channel is either open or closed. A channel is open upon creation, and once closed it remains closed. Once a channel is closed, any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown. Whether or not a channel is open may be tested by invoking its isOpen method.

Channels are, in general, intended to be safe for multithreaded access as described in the specifications of the interfaces and classes that extend and implement this interface.

大体信息如下:

Channel 是 I/O 操作的中心。

  1. Channel 代表与实体(如硬件设备、文件、网络套接字或程序组件)的打开连接,它能够执行一种或多种不同的 I/O 操作,例如读取或写入。
  2. 一个 Channel 要么打开要么关闭。Channel 在创建时打开,关闭后将保持关闭状态。一旦 Channel 关闭, 在其上调用 I/O 操作时,都会导致抛出 ClosedChannelException。可以通过 isOpen 方法来测试 Channel 是否打开。
  3. 通常,Channel 旨在支持多线程访问安全,与其接口、扩展和实现类中所陈述的规范保持一致。

用一个草图描述:通道数据应用输入源/输出目标 之间的关系:

desc

不难理解:它是 I/O操作的中心

主要的 Channel 类有:

  • FileChannel: 主要用于文件IO
  • DatagramChannel: 主要用于UDP网络IO
  • SocketChannel: 主要用于TCP网络IO
  • ServerSocketChannel: 主要用于监听TCP连接

值得注意:虽然Channel宣称是线程安全的,但不代表多线程并发写入都是合理的,仍然需要考虑写入的顺序和位置,以避免数据覆盖等问题,Zero-Copy暂不讨论

排除掉大量干扰后,可摘出如下UML图:

desc

Buffer 概述

Buffer 是特定基本类型的数据容器, 因 Channel 本身并不专注于内存中存储数据的细节,它配合 Channel 实现数据传输。

Buffer 可以从 数据类型是否使用堆内存访问方式 三个维度进行分类

按照数据类型分,JDK中主要包含有:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

见名知意,不在赘述,均直接继承自抽象类 Buffer

desc

整体实现思路一致,暂不展开。

按照是否使用堆内内存可以分为

  • 继承自 XXXXBufferHeapXXXXBuffer,如 HeapByteBuffer,它们使用JVM可以管理的堆内内存。
  • DirectBuffer 接口实现类,如 DirectByteBuffer,它们使用堆外内存,所谓堆外是相对概念,此内存区不受JVM管理,使用Unsafe类自行管理

按照访问方式裂变出 只读Buffer,如 HeapByteBufferR, DirectByteBufferR

Channel+Buffer 基本使用代码示例

以读取文件举例,使用方式大体如下:

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileChannelRead {
    public static void main(String[] args) throws IOException {
        String filePath = "/path/of/file";
        FileInputStream fis = new FileInputStream(filePath);
        FileChannel channel = fis.getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        while (channel.read(buffer) != -1) {
            // Prepare for reading
            buffer.flip();

            while (buffer.hasRemaining()) {
                byte b = buffer.get();
                // Do something with b
            }

            // Prepare for writing
            buffer.clear();
        }

        channel.close();
        fis.close();
    }
}

Selector 模型

文首我们提到:

在JAVA经典IO中,IO的阻塞会体现到JVM中的线程,在高并发IO场景下,开启、调度线程的资源消耗也会演变为性能瓶颈。

原因在于:经典IO无法在单一线程中应对多个IO等待数据就绪的场景。

不难理解,NIO1.0 需要解决这一问题,因此需要设计有 Selector ,它允许单个线程处理多个 Channel,具体表现为: Selector 可检测多个注册的Channel上是否有新连接、数据可读或数据可写事件,据此遴选出Channel进行处理。这样可以实现:单一线程管理多个channel

其工作原理本质为多路复用。可以体现为:

  • 将 Channel 注册到 Selector 上,并指定监听事件
  • 反复调用 selector.select() 方法,该方法会一直阻塞,直到监测到 Channel 有事件发生
  • select() 方法返回后,可以通过 Selector 获得的 SelectionKey 集合,判断事件发生的 Channel
  • 对 Channel 执行 IO 操作

模型示意图如下:

desc

此图片通过某度图片检索获得,未得到具体出处,如有侵权请留言联系删除

Selector的具体实现不再展开,它由spi机制实现特定提供。如果读者诸君感兴趣,可自行阅读 EPollSelectorImpl 等源码。

JDK中的 DefaultSelectorProvider 是默认的提供逻辑,根据 不同的操作系统 创建不同的 SelectorProvider,用于提供 SelectorImpl

  • Windows操作系统,则创建 WindowsSelectorProvider 实例。
  • MacOS操作系统,则创建 KQueueSelectorProvider 实例。
  • Linux操作系统,则创建 EPollSelectorProviderPollSelectorProvider 实例。

Selector使用 与 SelectionKey

读者诸君,可还记得上文中Channel的UML类图,Selector 面向 SelectableChannel 因此,FileChannel 是无法使用的。

SelectionKey

SelectionKey 是 可选择通道在 Selector 中注册的令牌, SelectionKey 有4个事件类型:

/**
 * 读事件:0000 0001
 */
public static final int OP_READ = 1 << 0;
 
/**
 * 写事件:0000 0100
 */
public static final int OP_WRITE = 1 << 2;
 
/**
 * 连接事件:0000 1000,连接操作,Client端支持的一种操作
 */
public static final int OP_CONNECT = 1 << 3;
 
/**
 * 接受事件:0001 0000,可接受操作,仅ServerSocketChannel支持
 */
public static final int OP_ACCEPT = 1 << 4;

常用API

interestOps()

返回在Selector中注册的关注事件集,可以通过位运算反解判断 Channel 是否在 Selector 中注册某关注事件

参考如下:

// 获取selectionKey 关注事件集
int interestSet = selectionKey.interestOps(); 
 
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) != 0;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT != 0;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ != 0;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE != 0;

readyOps()

获取此selectionKey 对应的通道上已经就绪的事件集。可使用位运算判断相应事件,也可以直接使用API:

  • boolean isAcceptable()
  • boolean isConnectable()
  • boolean isReadable()
  • boolean isWritable()

channel()

获取注册时对应的Channel

Channel channel = selectionKey.channel();

selector()

获取注册时目标Selector

Selector selector = selectionKey.selector();

attachment()

SelectionKey支持绑定一个附加对象

//注册时绑定
SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_READ,obj);

//更新绑定
selectionKey.attach(obj);

通过 selectionKey.attachment(); 可获取该附加对象。

需要注意:当不再需要该附加对象时,需解除绑定 selectionKey.attach(null),否则影响GC

SelectionKey的底层实现细节不再展开。

Selector 常用API

select(),select(timeout),selectNow()

获取 存在已就绪事件的通道 的数量,

当返回值不为0时,可进行IO处理

selectedKeys()

获取注册于该 Selector 上的 事件就绪SelectionKey,参考上文中 SelectionKey进行使用,以下代码是一个使用示例:

Set<SelectionKey> selectedKeys = selector.selectedKeys();
 
Iterator<SelectionKey> it = selectedKeys.iterator();
 
while(it.hasNext()) {
    SelectionKey key = keyIterator.next();
 
    if(key.isXXXXable()) {
     //处理 XXXX 事件
    }
 
   it.remove();
}

wakeUp()

当调用 select() 阻塞时,于另一个线程调用 wakeup() 可强行唤醒阻塞的线程,即select()方法立即返回。

如果调用wakeup()时,没有线程阻塞于 select() 上,下次调用 select() 将立即返回,不会进入阻塞状态。和 LockSupport.unpark() 类似。

close()

关闭 Selector,已注册的 SelectionKey 将失效,但不影响 Channel

Buffer API图解

缓冲区本质上是一个数组,但是它通过一系列的方法来追踪和访问数据,这使得它看起来像一个列表或队列。JDK中原汁原味的描述:"a linear, finite sequence of elements of a specific primitive type."

去掉其包含的内容数据不谈,缓冲区有3个属性:

  • 容量(Capacity):最大存储量,初始化时确定,不可改变
  • 位置(Position):下一个要读或写的元素索引
  • 限制(Limit):缓冲区不可操作的下一个元素的位置,limit <= capacity

沿用上文中读取文件的代码示例,其主要使用方式:

  • Buffer 中写入数据,如 channel.read(buffer),此时为写模式
  • 调用 flip() 切换为读模式
  • Buffer 中取出数据,如 buffer.get()
  • 调用 clear()compact() 切换为写模式

重复直到读取完成(或异常)。

API

下文将结合以下Demo代码介绍API,图示其属性变化

class Demo {
    void demo() {
        // 创建一个长度为10的ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(10);

        // 写入5个字节到缓冲区
        buffer.put((byte) 1);
        buffer.put((byte) 2);
        buffer.put((byte) 3);
        buffer.put((byte) 4);
        buffer.put((byte) 5);

        // 翻转缓冲区 准备读取 
        buffer.flip();

        // 读取2个字节 
        byte a = buffer.get();
        byte b = buffer.get();

        buffer.rewind();

        // 清空缓冲区  
        buffer.clear();
    }
}

分配空间

使用JVM堆内存的Buffer:

ByteBuffer#allocate(int capacity)

使用堆外内存的Buffer:

ByteBuffer#allocateDirect(int capacity)

申请一个容量为10的Buffer,此时:

  • position 指向0
  • limit 和 capacity 为10,指向最大的位置9
desc

向Buffer中写入数据

以ByteBuffer为例,具有以下重载API,参数见名知意:

ByteBuffer#put(byte b)
ByteBuffer#put(int index, byte b)
ByteBuffer#put(ByteBuffer src)
ByteBuffer#put(byte[])
ByteBuffer#put(byte[] src, int offset, int length)

而在 CharBuffer 等Buffer子簇中,针对其目标数据类型具有相应的API

而JDK中以Byte为主要数据类型进行基础功能的实现,下文中如无必要,将只讨论以Byte为目标数据的功能实现类。

作者按:读者诸君,如果您对JAVA的泛型和设计模式有一定了解,可能会思考 "为何JDK中的代码不利用泛型进行更系统性的抽象",但注意:JDK1.4中尚无泛型

依次写入5个byte之后:

  • position 指向5
  • limit 和 capacity 为10,指向最大的位置9
desc

flip

翻转Buffer:

Buffer#flip()

public Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

如Doc所描述,flip干三件事情:The limit is set to the current position and then the position is set to zero. If the mark is defined then it is discarded.

不难理解,它的设计意图是将Buffer从 "写" 的工作模式切换到 "读" 的工作模式,并限定读取的边界。

在写入5个数据后flip:

  • position 指向0
  • limit 为5, capacity 为10
desc

从Buffer中读出数据

ByteBuffer#get() :byte
ByteBuffer#get(int index) :byte
ByteBuffer#get(byte[] dst)
ByteBuffer#get(byte[] dst, int offset, int length)

参数见名知意,不再赘述。

继续flip后的操作,读取两个byte数据,此时:

  • position 指向2
  • limit 为5, capacity 为10
desc

rewind

public Buffer rewind() {
    this.position = 0;
    this.mark = -1;
    return this;
}

使用rewind可以重置 mark标记position,此时可以从头再次读取缓冲区内容。

clear & compact

public Buffer clear() {
    this.position = 0;
    this.limit = this.capacity;
    this.mark = -1;
    return this;
}

//HeapByteBuffer
public ByteBuffer compact() {
    int var1 = this.position();
    int var2 = this.limit();

    assert var1 <= var2;

    int var3 = var1 <= var2 ? var2 - var1 : 0;
    System.arraycopy(this.hb, this.ix(var1), this.hb, this.ix(0), var3);
    this.position(var3);
    this.limit(this.capacity());
    this.discardMark();
    return this;
}

clear 将重置所有属性,与初始化的状态一致,但并不会清除数据。不难理解:各个标记已经保障读写安全,因此历史缓冲不会导致脏数据问题

compact 将清除已读数据,并将属性处理为满足 "在未读数据后写入"

desc

mark & reset

mark和reset需要合并使用, mark用于在当前position打标记,进过一些行为后可能position发生变化,通过reset将position恢复到先前的标记。

public Buffer mark() {
    this.mark = this.position;
    return this;
}

public Buffer reset() {
    int var1 = this.mark;
    if (var1 < 0) {
        throw new InvalidMarkException();
    } else {
        this.position = var1;
        return this;
    }
}

唠叨几句

上个月立了个FLAG,放言端午前写完IO系列。在内心也希望在今年多产出一些基础文章。

  • 一来是看看温故是否能知新;
  • 二来是基础文章对刚入行或者准备入行的小兄弟们帮助大一些;
  • 三来是对阐述理念类的文章还没有做好充足的准备;

但计划赶不上变化,我也未能想到,在不换坑的情况下,我的工作性质发生了巨大改变,这种改变尚未到谈论好坏的阶段,但确实要耗费太多心神,以至于我的零碎时间往往用于情绪垃圾回收,而没心思再做它顾。

过于絮叨了。