Java NIO入门篇


Java NIO入门篇

第一章 Java NIO概述

1.1 IO 概述

IO 的操作方式通常分为几种:同步阻塞 BIO、同步非阻塞 NIO、异步非阻塞 AIO。

  1. 在 JDK1.4 之前,我们建立网络连接的时候采用的是 BIO 模式。
  2. Java NIO(New IO 或 Non Blocking IO)是从 Java 1.4 版本开始引入的一个新的
    IO API,可以替代标准的 Java IO API。NIO 支持面向缓冲区的、基于通道的 IO 操作。
    NIO 将以更加高效的方式进行文件的读写操作。BIO 与 NIO 一个比较重要的不同是,
    我们使用 BIO 的时候往往会引入多线程,每个连接对应一个单独的线程;而 NIO 则是
    使用单线程或者只使用少量的多线程,让连接共用一个线程。
  3. AIO 也就是 NIO 2,在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的IO 模型。

1.2 阻塞IO(BIO)

阻塞IO(BIO)是最传统的一种IO模型,即在读写数据过程中会发生阻塞现象,直至有数据可供读取或者写入。

  1. 在BIO模式中,服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求,这种模式虽然简单方便,但由于服务器为每个客户端的连接都采用一个线程去处理,使得资源占用非常大。因此当连接数量达到上限时,如果再有用户请求连接,直接会导致资源瓶颈,严重的可能会直接导致服务器崩溃。

  2. 大多数情况下为了避免上述问题,都采用了线程池模型。也就是创建一个固定大小的线程池,如果有客户端请求,就从线程池中取一个空闲线程来处理,当客户端处理完操作之后,就会释放对线程的占用。因此这样就避免为每一个客户端都要创建线程带来的资源浪费,使得线程可以复用。但线程池也存在弊端,如果连接大多是长连接,可能会导致在一段时间内,线程池中的线程都被占用,那么当再有客户端请求连接时,由于没有空闲线程来处理,就会导致客户端连接失败。传统的BIO模式如下图所示:

    client-server结构

1.3 非阻塞IO(NIO)

基于BIO的各种弊端,在JDK1.4开始出现了高性能IO设计模式非阻塞IO(NIO)。

  1. NIO采用非阻塞模式,基于Reactor模式的工作方式,I/O调用不会被阻塞,它的实现过程是:会先对每个客户端注册感兴趣的事件,然后有一个线程专门去轮询每个客户端是否有事件发生,当有事件发生时,便顺序处理每个事件,当所有事件处理完之后,便再转去继续轮询。如下图所示:

    事件处理过程

  2. NIO中实现非阻塞I/O的核心对象就是Selector,Selector就是注册各种I/O事件的地方,而且当我们感兴趣的事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:

    Selector注册

  3. NIO的最重要的一个地方是当一个连接创建后,不需要对应一个线程,这个连接会被注册到多路复用器上面,一个选择器线程可以同时处理成千上万个连接,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。

    IO NIO
    面向流(Stream Oriented) 面向流缓冲区(Buffer Oriented)
    阻塞IO(Blocking IO) 非阻塞IO(Non Blocking IO)
    (无) 选择器(Selectors)

1.4 异步非阻塞IO(AIO)

  1. AIO也就是NIO 2,在Java7中引入了NIO的改进版NIO 2,它是异步非阻塞的IO模型。异步IO是基于事件和回调机制实现的,也就是说AIO模式不需要Selector操作,而是事件驱动形式,也就是当客户端发送数据之后,会主动通知服务器,接着服务器再进行读写操作。
  2. Java的AIO API其实就是Proactor模式的应用,和Reactor模式类似。

Reactor和Proactor模式的主要区别就是真正的读取和写入操作是由谁来完成的,Reactor中需要应用程序自己读取或者写入数据,而Proactor模式中,应用程序不需要进行实际的读写过程,它只需要从缓存区读取或者写入即可,操作系统会读取缓存区或者写入缓存区到真正的IO设备。

1.5 NIO概述

Java NIO由以下几个核心部分组成:

  • Channels
  • Buffers
  • Selectors

虽然Java NIO中除此之外还有很多类和组件,但是Channel,Buffer和Selector构成了核心的API。其他组件,如Pipe和FileLock,只是上述三个组件共同使用的工具类。

1.5.1 Channel

Channel和IO中的Stream(流)是差不多同级别的。区别在于Stream是单向,如:InputStream,OutputStream。而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

NIO中的Channel主要实现有:FileChannel、DatagramChannel、SocketChannel和ServerSocketChannel。分别对应文件IO、UDP和TCP(Server和Client)

1.5.2 Buffer

NIO中的关键Buffer实现有:ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer,分别对应基本数据类型:byte,char,double,float,int,long,short

1.5.3 Selector

Selector运行单线程处理多个Channel。如,在一个聊天服务器中。要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子如:接收到一个新的连接、数据接收等。

1.5.4 Channel Buffer Selector三者关系

  1. Channel就像一个流,但是是双向的。读数据时,从Channel中读取数据,然后将数据写到Buffer中;写数据时,从Buffer中读取数据,写到Channel中。

    channel-buffer

  2. 一个Selector允许一个线程处理多个Channel

    channel-selector

第二章 Java NIO(Channel)

2.1 Channel概述

Channel和Stream的区别:

  • 既可以从Channel中读取数据,又可以写数据到通道。但Stream流的读写通常是单向的。
  • Channel可以异步的读写
  • Channel中的数据必须依赖于Buffer,例如:读数据需要将读到的数据放到Buffer中,写数据则需要读取Buffer中的数据,然后写入到通道中。

channel-buffer

2.2 Channel实现

以下为Channel中常用的Channel的实现:

种类 描述
FileChannel 从文件中读写数据
DatagramChannel 通过UDP读写网络中的数据
SocketChannel 通过TCP读写网络中的数据
ServerSocketChannel 可以监听新进来的TCP连接,类似于Web服务器。
每一个新进来的连接都会创建一个SocketChannel

2.3 FileChannel常用API

方法 描述
int read(ByteBuffer dst) 从Channel 中读取数据到ByteBuffer
long read(ByteBuffer[] dsts) 将Channel 中的数据“分散”到ByteBuffer[]
int write(ByteBuffer src) 将ByteBuffer 中的数据写入到Channel
long write(ByteBuffer[] src) 将ByteBuffer[] 中的数据“聚集”到Channel
long position() 返回此通道的文件位置
FileChannel position(long p) 设置此通道的文件位置
long size() 返回此通道的文件的当前大小
FileChannel truncate(long s) 将此通道的文件截取为给定大小
void force(boolean metaData) 强制将所有对此通道的文件更新写入到存储设备中

2.4 FileChannel操作

2.4.1 FileChannel的创建与关闭

可通过使用 InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例。

 // 创建FileChannel
RandomAccessFile file = new RandomAccessFile("E:\\java\\nio\\test\\test.txt","rw");
FileChannel channel = file.getChannel();
// 关闭channel
channel.close();

2.4.2 从FileChannel读数据

使用FileChannel读取数据到Buffer

public class FileChannelReadDemo {
    // FileChannel读取数据到buffer中
    public static void main(String[] args) throws IOException {
        // 创建FileChannel
        RandomAccessFile file = new RandomAccessFile("E:\\java\\nio\\test\\test.txt","rw");
        FileChannel channel = file.getChannel();
        // 创建Buffer缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        // 将FileChannel中的数据读取到buffer中,返回值为读取到的文件位置
        int bytesRead = channel.read(buffer);
        // 以字符的形式打印读取到的文件数据,-1为读取到了文件的结尾
        while (bytesRead != -1) {
            System.out.println("读取文件的字节数: " + bytesRead);
            // 翻转缓冲区,将limit设置为当前position的值,将position置为0
            buffer.flip();
            // 判断是否读取结束
            while (buffer.hasRemaining()) {
                // 打印文件内容
                System.out.print((char)buffer.get());
            }
            System.out.println();
            buffer.clear();
            bytesRead = channel.read(buffer);
        }
        channel.close();
        System.out.println("结束了");
    }
}

2.4.3 向FileChannel写数据

使用FileChannel.write()方法向FileChannel写数据该方法参数是一个buffer,读取buffer中的数据写到channel中

public class FileChannelWriteDemo {
    public static void main(String[] args) throws IOException {
        // 打开FileChannel
        RandomAccessFile accessFile = new RandomAccessFile("E:\\java\\nio\\test\\test.txt", "rw");
        FileChannel channel = accessFile.getChannel();

        // 创建buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String testStr = "test write";
        buffer.clear();
        // 写入数据
        buffer.put(testStr.getBytes());

        // 写模式转为读模式
        buffer.flip();

        // FileChannel写入数据
        while (buffer.hasRemaining()) {
            channel.write(buffer);
        }
        channel.close();
        System.out.println("执行结束");
    }
}

2.4.4 FileChannel的position方法

position() 获取FileChannel的当前位置

postion(long pos) 设置FileChannel的当前位置

通过调用position()方法,可以在FileChannel的某个特定位置进行数据的读/写操作

例:

long pos = channel.position();
channel.position(pos+123);

如果将位置设置在文件结束符之后,然后试图从文件通道中读取数据,读方法将返回- 1 (文件结束标志)。
如果将位置设置在文件结束符之后,然后向通道中写数据,文件将撑大到当前位置并
写入数据。这可能导致“文件空洞”,磁盘上物理文件中写入的数据间有空隙

2.4.5 FileChannel的size方法

FileChannel实例的size()方法将返回该实例所关联文件的大小。

如:long fileSize = channel.size();

2.4.6 FileChannel的truncate方法

可以使用FileChannel.truncate()方法截取一个文件。截取文件时,文件中指定长度后面的部分将被删除

如:channel.truncate(1024); 截取文件的前1024个字节。

2.4.7 FileChannel的force方法

FileChannel.force()方法将通道里尚未写入磁盘的数据强制写到磁盘上。出于性能方面的考虑,操作系统会将数据缓存在内存中,所以无法保证写入到FileChannel里的数据一定会及时写到磁盘上。要保证这一点,需要调用force()方法。

force()方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

2.4.8 FileChannel的transferTo和transferFrom方法

通道之间的数据传输:

如果两个通道中有一个是FileChannel,可以直接将数据从一个channel传输到另外一个channel。

(1)transferFrom()方法

FileChannel的transferFrom()方法可以将数据从源通道传输到FileChannel中(注:此方法在JDK文档中的解释为将字节从给定的可读取字节通道传输到此通道的文件中)。

(2)tranferTo()方法

transferTo()方法将数据从FileChannel传输到其他的channel中。

例:

public class FileChannelTransferDemo {

    // transferFrom()
    public static void main(String[] args) throws IOException {
        // 创建两个channel
        RandomAccessFile aFile = new RandomAccessFile("E:\\java\\nio\\test\\transferFromTest.txt", "rw");
        FileChannel fromChannel = aFile.getChannel();
        RandomAccessFile bFile = new RandomAccessFile("E:\\java\\nio\\test\\transferFromTest1.txt", "rw");
        FileChannel toChannel = bFile.getChannel();

        // fromChannel传输到 toChannel
        long position = 0;
        long size = fromChannel.size();
        // 使用transferFrom的方式
        // toChannel.transferFrom(fromChannel,position, size);
        // 使用transferTo的方式
        fromChannel.transferTo(position, size, toChannel);
        aFile.close();
        bFile.close();
        System.out.println("transferFrom test over!");
    }
}

2.5 Scatter/Gather

Java NIO开始支持scatter/gather,scatter/gather用于描述从Channel中读取或者写入到Channel的操作。

分散(scatter)从Channel中读取是指在读操作时将读取的数据写入多个buffer中,因此,Channel将从Channel中读取的数据“分散(scatter)”到多个buffer中。

聚集(gather)写入Channel是指在写操作时将多个buffer的数据写入同一个Channel,因此,Channel将多个buffer的数据“聚集(gather)”后发送到Channel。

scatter/gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

2.5.1 Scattering Reads

Scattering Reads(分散读取)是指数据从一个channel读取到多个buffer中。如下:

scattering-reads

例:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = {header, body};
channel.read(bufferArray);

创建两个buffer对象,然后放入到bufferArray数组中,再将数组作为channel.read()方法中的输入参数,read()方法将按照buffer对象在数组中的顺序依次将数据写入到buffer中。

Scattering Reads在移动到下一个buffer前,必须写满当前的buffer,这也意味着它不适用于动态消息(注:消息大小(长度)不固定)。也就是说,若存在消息头和消息体,消息头必须完成buffer的填充(如128byte),Scattering Reads才能正常工作。

2.5.2 Gathering Writes

Gathering Writes(收集写入)是指数据从多个buffer写入到同一个channel。如下:

gathering-writes

例:

ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = {header, body};
channel.write(bufferArray);

创建两个buffer对象,然后放入到bufferArray数组中,再将数组作为channel.write()方法中的输入参数,write()方法将按照buffer对象在数组中的顺序依次将数据取出并写入到channel中。注:只有处于position和limit之间的数据才会被取出并被写入。如果一个buffer的capacity为128byte,但是仅仅包含64byte的数据,那么只有这64byte的数据将被取出并写入到channel中。因此,与Scattering Reads相反,Gathering Writes能较好的处理动态消息。

第三章 Java NIO(SocketChannel)

(1)SocketChannel就是NIO对于非阻塞Socket操作的支持的组件,其在Socket上封装了一层,主要是支持了非阻塞的读写。同时改进了传统的单向流API,Channel同时支持读写。

(2)socket通道类主要分为DatagramChannelSocketChannelServerSocketChannel,它们在被实例化时都会创建一个对等socket的对象。要把一个socket通道置于非阻塞模式,需要依靠所有socket通道类的公有超级类:SelectableChannel

就绪选择(readiness selection)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。

非阻塞IO和可选择性是紧密相连的,这也正是管理阻塞模式的API代码要在SelectableChannel超级类中定义的原因。

(3)设置或重新设置一个通道的阻塞模式:调用configureBlocking()方法,传递参数值为true(如:channel.configureBlocking(true))则设置为阻塞模式,参数值为false则为非阻塞模式。可以通过isBlocking()方法来判断某个socket通道当前处于哪种模式。

AbstractSelectableChannel.java中实现的configureBlocking()方法如下:

public final SelectableChannel configureBlocking(boolean block)
        throws IOException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if (blocking == block)
                return this;
            if (block && haveValidKeys())
                throw new IllegalBlockingModeException();
            implConfigureBlocking(block);
            blocking = block;
        }
        return this;
    }

下面分别介绍这三个通道:

3.1 ServerSocketChannel

ServerSocketChannel是一个基于通道的socket监听器。同java.net.ServerSocket执行相同的任务,不过它增加了通道语义,因此能够在非阻塞模式下运行。

由于ServerSocketChannel没有bind()方法,因此需要取出对等的socket并使用它来绑定到一个端口以开始监听连接。同时也是使用对等ServerSocket的API来根据需要设置其他的socket选项。

同java.net.ServerSocket一样,ServerSocketChannel也有accept()方法。

ServerSocketChannel的accept()方法会返回SocketChannel类型对象,SocketChannel可以在非阻塞模式下运行。

(1)打开ServerSocketChannel

通过调用ServerSocketChannel.open()方法来打开ServerSocketChannel。

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

(2)关闭ServerSocketChannel

通过调用ServerSocketChannel.close()方法来关闭ServerSocketChannel。

(3)监听新的连接

通过ServerSocketChannel.accept()方法监听新进入的连接。当accept()方法返回时,它返回一个包含新进来的连接的SocketChannel。因此,accept()方法会一直阻塞,直到有新的连接到达。

通常不会仅仅只监听一个连接,在while循环中调用accept()方法,如下demo:

while (true) {
    System.out.println("等待连接...");
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (socketChannel == null) {
        System.out.println("无连接,连接对象为null");
        Thread.sleep(2000);
    } else {
        System.out.println("接受连接,连接来自: " + socketChannel.socket().getRemoteSocketAddress());
        // 将缓冲区的position置为0,为了读取数据
        byteBuffer.rewind();
        socketChannel.write(byteBuffer);
        System.out.println();
        socketChannel.close();
    }
}

(4)阻塞模式

如果channel.configureBlocking(true),将会阻塞住当前进程,直到有连接进入。

(5)非阻塞模式

如果channel.configureBlocking(false),为非阻塞模式,在此模式下,accept()方法会立刻返回。对于没有新进入的连接,将返回null。

以下为一个ServerSocketChannel的简单demo:

package com.hydrangea.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * @Author hydrangea
 * @Date 2023/3/28 11:41
 * @Description
 */
public class ServerSocketChannelDemo {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 端口号
        int port = 8888;

        // 创建buffer
        ByteBuffer byteBuffer = ByteBuffer.wrap("hello ServerSocketChannel".getBytes());

        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定要监听的端口
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        // 设置非阻塞模式
        serverSocketChannel.configureBlocking(false);

        // 一直是否有新的连接传入
        while (true) {
            System.out.println("等待连接...");
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel == null) {
                System.out.println("无连接,连接对象为null");
                Thread.sleep(2000);
            } else {
                System.out.println("接受连接,连接来自: " + socketChannel.socket().getRemoteSocketAddress());
                // 将缓冲区的position置为0,为了读取数据
                byteBuffer.rewind();
                socketChannel.write(byteBuffer);
                System.out.println();
                socketChannel.close();
            }
        }
    }
}

3.2 SocketChannel

3.2.1 SocketChannel介绍

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道。

SocketChannel是一种面向流连接sockets套接字的可选择通道。

  • SocketChannel用来连接Socket套接字
  • SocketChannel是主要用来处理网络IO的通道
  • SocketChannel基于TCP连接传输
  • SocketChannel实现了可选择通道,可被多路复用

3.2.2 SocketChannel特征

  1. 对于已经存在的socket不能创建SocketChannel
  2. SocketChannel中提供的 open 接口创建的 Channel 并没有进行网络级联,需要使用connect接口连接到指定地址
  3. 未进行连接的SocketChannel执行IO操作时,会抛出NotYetConnectedException
  4. SocketChannel支持两种IO模式:阻塞式和非阻塞式
  5. SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite()方法,则写阻塞的线程将抛出AsynchronusCloseException
  6. SocketChannel支持设定参数
    1. SO_SNDBUF 套接字发送缓冲区大小
    2. SO_RCVBUF 套接字接收缓冲区大小
    3. SO_KEEPALIVE 维持连接
    4. O_REUSEADDR 复用地址
    5. SO_LINGER 有数据传输时延缓关闭Channel(只有在非阻塞模式下才有用)
    6. TCP_NODELAY 禁用Nagle算法

3.2.3 SocketChannel的使用

(1)创建SocketChannel

方式一:

使用有参open方法

SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com", 80));

方式二:

使用无参open方法,但是无参只是创建了一个SocketChannel对象,并没有进行实质性的tcp连接,需要使用connect()方法进行连接

SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("www.baidu.com", 80));
(2)连接校验
socketChannel.isOpen(); // 测试SocketChannel是否为Open状态
socketChannel.isConnected();    // 测试SocketChannel是否已经被连接
socketChannel.isConnectionPending();    // 测试SocketChannel是否正在进行连接
socketChannel.finishConnect();  // 校验正在进行连接的SocketChannel是否已经完成连接
(3)阻塞模式
socketChannel.configureBlocking(false); // false非阻塞,true阻塞
(4)读写
// 阻塞式读
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("read over");

// 非阻塞式读
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.read(byteBuffer);
socketChannel.close();
System.out.println("read over");
(5)设置和获取参数
/***
* 获取相关的参数
* SO_SNDBUF 套接字发送缓冲区大小
* SO_RCVBUF 套接字接收缓冲区大小
* SO_KEEPALIVE 保活连接
* O_REUSEADDR 复用地址
* SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用)
* TCP_NODELAY 禁用 Nagle 算法
*/
// 设置参数
socketChannel
	.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE)
    .setOption(StandardSocketOptions.TCP_NODELAY, Boolean.TRUE);
// 获取参数
socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
socketChannel.getOption(StandardSocketOptions.TCP_NODELAY);

以下为SocketChannel的使用demo:

package com.hydrangea.channel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

/**
 * @Author hydrangea
 * @Date 2023/3/28 12:10
 * @Version 1.0.0
 * @Description SocketChannel面向tcp连接
 */
public class SocketChannelDemo {

    public static void main(String[] args) throws IOException {
        // 创建SocketChannel
        // SocketChannel socketChannel = SocketChannel.open();
        // socketChannel.connect(new InetSocketAddress("www.baidu.com", 80));
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8888));

        /*
            socketChannel.isOpen(); // 测试SocketChannel是否为Open状态
            socketChannel.isConnected();    // 测试SocketChannel是否已经被连接
            socketChannel.isConnectionPending();    // 测试SocketChannel是否正在进行连接
            socketChannel.finishConnect();  // 校验正在进行连接的SocketChannel是否已经完成连接
        */

        // 设置阻塞和非阻塞
        socketChannel.configureBlocking(false);

        /***
         * 获取相关的参数
         * SO_SNDBUF 套接字发送缓冲区大小
         * SO_RCVBUF 套接字接收缓冲区大小
         * SO_KEEPALIVE 保活连接
         * O_REUSEADDR 复用地址
         * SO_LINGER 有数据传输时延缓关闭 Channel (只有在非阻塞模式下有用)
         * TCP_NODELAY 禁用 Nagle 算法
         */
        socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE)
                .setOption(StandardSocketOptions.TCP_NODELAY, Boolean.TRUE);
        socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
        socketChannel.getOption(StandardSocketOptions.TCP_NODELAY);

        // 非阻塞读
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int byteReads = socketChannel.read(byteBuffer);
        while (byteReads != -1) {
            System.out.println("读取的字节数: " + byteReads);
            // 转换为读取状态
            byteBuffer.flip();
            while (byteBuffer.hasRemaining()) {
                System.out.print((char)byteBuffer.get());
            }
            byteBuffer.clear();
            byteReads = socketChannel.read(byteBuffer);
        }
        socketChannel.close();
        System.out.println("\nread over");
    }
}

3.3 DatagramChannel

如SocketChannel对应Socket,ServerSocketChannel对应ServerSocket,每一个DatagramChannel对象也有一个关联的DatagramSocket对象。

SocketChannel模拟连接导向的流协议(如TCP/IP),DatagramChannel则模拟包导向的无连接协议(如UDP/IP)。

DatagramChannel是无连接的,每个数据报(datagram)都是一个自包含的实体,拥有自己的目的地址及不依赖其他数据报的数据负载。与面向流的socket不同,DatagramChannel可以发送单独的数据报给不同的目的地址。同样,DatagramChannel对象也可以接收任意地址的数据报,每个到达的数据报都含有源地址信息。

1.创建DatagramChannel

DatagramChannel sendChannel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);

2.发送数据

通过send()发送UDP包

DatagramChannel sendChannel = DatagramChannel.open();
InetSocketAddress sendAddress = new InetSocketAddress("127.0.0.1", 9999);
int i = 0;
while (true) {
    // sendChannel.send(ByteBuffer.wrap(("数据包"+i++).getBytes(StandardCharsets.UTF_8)), sendAddress);
    ByteBuffer byteBuffer = ByteBuffer.wrap(("数据包" + i++).getBytes(StandardCharsets.UTF_8));
    sendChannel.send(byteBuffer, sendAddress);
    System.out.println("发包端发送数据包");
    Thread.sleep(1000);
}

3.接收数据

通过receive()接收UDP包

// 创建DatagramChannel并绑定对应的端口号
DatagramChannel receiveChannel = DatagramChannel.open();
// receiveChannel.socket().bind(new InetSocketAddress(9999));
receiveChannel.bind(new InetSocketAddress(9999));

// 接受数据
ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
while (true) {
    receiveBuffer.clear();
    SocketAddress sendAddress = receiveChannel.receive(receiveBuffer);
    receiveBuffer.flip();
    System.out.println(sendAddress.toString() + " ");
    // Charset charset = Charset.forName("UTF-8");
    // CharBuffer charBuffer = charset.decode(receiveBuffer);
    // System.out.println(charBuffer);
    System.out.println(Charset.forName(String.valueOf(StandardCharsets.UTF_8)).decode(receiveBuffer));
}

4.建立连接

UDP不存在真正意义上的连接,这里的连接是向特定服务地址用read()write()接收或者发送数据包

DatagramChannel connectChannel = DatagramChannel.open();
// 接受方绑定端口
connectChannel.bind(new InetSocketAddress(9998));
// 发送方建立与接收方的连接
connectChannel.connect(new InetSocketAddress("127.0.0.1", 9998));
connectChannel.write(ByteBuffer.wrap("数据包".getBytes(StandardCharsets.UTF_8)));
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while (true) {
    readBuffer.clear();
    connectChannel.read(readBuffer);
    readBuffer.flip();
    System.out.println(StandardCharsets.UTF_8.decode(readBuffer));
}

read()write()只有在connect之后才能使用,否则将抛出NotYetConnectException异常。用read()接收时,如果没有接收到包,会抛出PortUnreachableException异常。

第四章 Java NIO(Buffer)

4.1 Buffer 简介

Java NIO中的Buffer用于和NIO通道进行交互。数据从通道读入缓冲区,从缓冲区写入到通道中。

buffer

缓冲区本质上是一块可以进行读写的内存。这块内存被封装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。缓冲区实际上是一个容器对象,各自对应各自类型的数组,如ByteBuffer为一个byte数组,在NIO中所有数据都由缓冲区来进行处理。

读取数据时,从channel读取数据并写入到buffer中;写入数据时,读取buffer中的数据并写入到缓冲区中。而在面向IO流的系统中,所有数据都是直接写入或者直接将数据读取到Stream对象中的。

在NIO中,所有的缓冲区类型都继承自抽象类Buffer,最常用的有ByteBuffer,对于Java中的基本类型,基本都有一个具体Buffer类型相对应,继承关系如下图所示:

buffer-继承关系

4.2 Buffer的基本用法

1、使用Buffer读写数据,一般遵循以下四个步骤:

  1. 写入数据到buffer
  2. 调用flip()方法
  3. 从buffer中读取数据
  4. 调用clear()方法或者compact()方法

当向buffer写入数据时,buffer会记录下写了多少数据。一旦要读取数据,通过调用flip()方法将Buffer从写模式切换到读模式。在读模式下,可以读取之前写入到buffer的所有数据。一旦读完了所有的数据,就需要清空缓冲区,让它可以再次被写入。有两种方式能够清空缓冲区:调用clear()compact()方法。clear()方法会清空整个缓冲区。 compact()方法只会清楚已经读取过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。

2、使用buffer的demo

// 创建FileChannel
RandomAccessFile file = new RandomAccessFile("E:\\java\\nio\\test\\test.txt","rw");
FileChannel channel = file.getChannel();
// 创建Buffer缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 将FileChannel中的数据读取到buffer中,返回值为读取到的文件位置
int bytesRead = channel.read(buffer);
// 以字符的形式打印读取到的文件数据,-1为读取到了文件的结尾
while (bytesRead != -1) {
    System.out.println("读取文件的字节数: " + bytesRead);
    // 翻转缓冲区,将limit设置为当前position的值,将position置为0
    buffer.flip();
    // 判断是否读取结束
    while (buffer.hasRemaining()) {
        // 打印文件内容
        System.out.print((char)buffer.get());
    }
    System.out.println();
    buffer.clear();
    bytesRead = channel.read(buffer);
}
channel.close();
System.out.println("结束了");

3、使用IntBuffer的demo

// 创建buffer
IntBuffer intBuffer = IntBuffer.allocate(8);

// 放入数据到buffer中
for (int i = 0; i < intBuffer.capacity(); i++) {
	intBuffer.put(2 * i);
}

// 读取buffer
intBuffer.flip();
while (intBuffer.hasRemaining()) {
	System.out.print(intBuffer.get() + " ");
}

4.3 Buffer的capacity、position和limit

为了理解Buffer的工作原理,需要熟悉Buffer中的三个属性

  • capacity

  • position

  • limit

position和limit的含义取决于Buffer处在读模式还是写模式。不管Buffer处在什么模式,capacity的含义总是一样的。

Buffer 写模式 Buffer读模式
buffer-写模式 buffer-读模式
  • capacity

代表Buffer的容量,能够写入数据最大的长度。当Buffer达到最大长度,需要进行Buffer的清空(通过读数据或者清除数据)才能够继续往Buffer里写数据。

  • position

    1)写数据到Buffer中时,position代表写入数据的当前位置,position的初始值为0。当一个byte、long等数据写到Buffer后,position会向下移动到下一个可插入数据的Buffer单元。position最大可为capacity - 1(因为position的初始值为0)。

    2)从Buffer中读取数据时,position代表读取数据的当前位置,如position=2时表示已经开始读取了3个byte,或者从第3个byte开始读取。通过ByteBuffer.flip()切换到读模式时,position会被重置为0,当Buffer从position读取数据后,position会下移到下一个可读取的数据Buffer单元。

  • limit

    1)写数据时,limit表示可对Buffer最多写入多少个数据。写模式下,limit等于Buffer的capacity 。

    2)读数据时,limit表示Buffer里有多少可读数据(not null的数据),因此能读到之前写入的所有的数据(limit被设置成已写数据的数量,这个值在写模式下就是position)。

    如:在写数据到Buffer中时,写入了5个byte,则 position = 4,通过调用flip()方法将buffer切换到读模式,能读取的最大的长度为limit,limit 等于原先写入的数据量(即写数据时的position)。

4.4 Buffer的类型

Java NIO有以下Buffer的类型

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

这些Buffer类型代表了不同的数据类型。分别对应char,short,int,long,float或double类型来操作缓冲区中的字节。

4.5 Buffer分配和写数据

1、Buffer分配

通过调用allocate(int capacity)方法来 创建一个buffer对象,每一个Buffer类都由一个allocate(int capacity)方法

// 分配一个可存储1024字节的ByteBuffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 分配一个可存储1024字符的CharBuffer
CharBuffer charBuffer = CharBuffer.allocate(1024);

通过调用Buffer类中的wrap(byte[] array)方法也可以创建一个Buffer对象

ByteBuffer byteBuffer = ByteBuffer.wrap("hello ServerSocketChannel".getBytes());

2、向Buffer中写入数据

写数据到Buffer有两种方式:

(1)从Channel读取数据并写入到Buffer。

Channel.read(byteBuffer);

(2)通过Buffer的put()方法写到Buffer里。

ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.put(1);

3、从Buffer中读取数据

(1)从Buffer中读取数据写入到Channel

channel.write(byteBuffer);

(2)使用get()方法从Buffer中读取数据

byte aByte = buf.get();

4、模式切换 flip

通过调用flip()方法,可以将写模式切换到读模式。即调用flip()方法后,会将position置为0,并将limit设置成之前写模式时候的position的值。

在写模式下,position代表要写入或者正在写入的数据的位置,limit为buffer的最大容量(limit = capacity

在读模式下,position代表要读取或者正在读取的数据的位置,limit为写模式下的position的值

4.6 Buffer的几个方法

  • rewind()

    Buffer.rewind()将position设回0,所以可以重读Buffer中的所有数据。limit保持不变,仍然表示能从Buffer中读取多少个元素(byte、char等)

  • clear()

    调用clear()方法,position将被设置为0,limit被设置为capacity的值。即Buffer被清空了,但是Buffer中的数据并未清楚,只不过这些标记的意义是可以重新开始写入数据到Buffer中了。

  • compact()

    compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。

  • mark()与reset()

    通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以调用Buffer.reset()方法恢复到这个position。

4.7 缓冲区操作

  • 缓冲区分片

    在现有缓冲区上切出来一片作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数组层面上是数据共享的,即子缓冲区相当于现有缓冲区的一个视图窗口。通过调用**slice()**方法可以创建一个子缓冲区

  • 只读缓冲区

    只读缓冲区即只能够读取,但不能够写入数据。通过调用asReadOnlyBuffer()方法,将任何常规缓冲区转换为只读缓冲区,此方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据。如果原缓冲区的数据发生了变化,那么只读缓冲区的内容也将随之发生变化。如果尝试修改只读缓冲区的内容,将会抛出ReadOnlyBufferException异常。

  • 直接缓冲区

    直接缓冲区是为了加快IO速度,使用一种特殊方式为其分配内存的缓冲区。JDK文档描述为:给定一个直接字节缓冲区,Java虚拟机将尽最大努力直接对它执行本机IO操作。也就是说,它会在每一次调用底层操作系统的本机IO操作之前(或之后),尝试避免将缓冲区的内容拷贝到一个中间缓冲区中,或者从一个中间缓冲区中拷贝数据。通过调用allocateDirect()方法可以创建一个直接缓冲区。

  • 内存映射文件IO

    内存映射文件IO是一种读和写文件数据的方法,它可以比常规的基于流或者基于通道的IO快的多。内存映射文件IO是通过使文件中的数据出现为内存数组的内容来完成的,只有文件中实际读取或者写入的部分才会映射到内存中。

缓冲区分片demo

/**
 * 子缓冲区
 */
@Test
public void bufferSlice() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(8);

    for (int i = 0; i < byteBuffer.capacity(); i++) {
        byteBuffer.put((byte)i);
    }
    // 创建子缓冲区
    byteBuffer.position(3);
    byteBuffer.limit(7);
    ByteBuffer subByteBuffer = byteBuffer.slice();

    // 改变子缓冲区内容, 修改子缓冲区的数据会同步修改缓冲区中的数据
    for (int i = 0; i < subByteBuffer.capacity(); i++) {
        byte value = subByteBuffer.get(i);
        value *=10;
        subByteBuffer.put(value);
    }
    byteBuffer.position(0);
    byteBuffer.limit(byteBuffer.capacity());

    System.out.println("byteBuffer内容: ");
    while (byteBuffer.hasRemaining()) {
        System.out.print(byteBuffer.get() + " ");
    }
    subByteBuffer.flip();
    System.out.println("\nsubByteBuffer内容:");
    while (subByteBuffer.hasRemaining()) {
        System.out.print(subByteBuffer.get() + " ");
    }
}

只读缓冲区demo

/**
 * 只读缓冲区
 */
@Test
public void bufferReadOnly() {
    // 创建缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(10);

    for (int i = 0; i < buffer.capacity(); i++) {
        buffer.put((byte) (i + 1));
    }

    ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();

    /*
        // 测试使用compact函数在未读数据的后面进行数据的续写
        buffer.flip();
        while (buffer.hasRemaining()) {
            if (buffer.position() == 3) {
                break;
            }
            System.out.print(buffer.get() + " ");
        }

        buffer.compact();
        buffer.put((byte) 20);

        System.out.println();
        buffer.flip();
        while (buffer.hasRemaining()) {
            System.out.print(buffer.get() + " ");
        }*/

    // 转换为读模式
    readOnlyBuffer.flip();
    while (readOnlyBuffer.hasRemaining()) {
        System.out.print(readOnlyBuffer.get() + " ");
    }
    readOnlyBuffer.compact();
    readOnlyBuffer.put((byte)9999);
}

直接缓冲区demo

/**
 * 直接缓冲区
 * @throws IOException
 */
@Test
public void bufferDirect() throws IOException {
    FileInputStream fis = new FileInputStream(new File("E:\\java\\nio\\test\\test.txt"));
    FileChannel fisChannel = fis.getChannel();

    FileOutputStream fos = new FileOutputStream(new File("E:\\java\\nio\\test\\testDirectBuffer.txt"));
    FileChannel fosChannel = fos.getChannel();

    // 创建直接缓冲区
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    while(true) {
        buffer.clear();
        int readBytes = fisChannel.read(buffer);
        if (readBytes == -1) {
            break;
        }
        buffer.flip();
        fosChannel.write(buffer);
    }
    fis.close();
    fisChannel.close();
    fos.close();
    fosChannel.close();
}

内存映射文件IO demo

/**
 * 内存映射文件IO
 */

private static final int start = 0;

private static final int size = 1024;

@Test
public void bufferMappingIO() throws IOException {
    RandomAccessFile randomAccessFile = new RandomAccessFile("E:\\java\\nio\\test\\test.txt", "rw");
    FileChannel fileChannel = randomAccessFile.getChannel();
    MappedByteBuffer mbb = fileChannel.map(FileChannel.MapMode.READ_WRITE, start, size);

    mbb.put(0, (byte)97);
    mbb.put(1023, (byte)122);
    fileChannel.close();
}

第五章 Java NIO(Selector)

5.1 Selector 简介

1、Selector和Channel关系

Selector一般称为选择器,即多路复用器。它是Java NIO核心组件中的一个,用于检查一个或者多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络连接。

selector

使用Selector的好处在于:可以使用更少的线程处理通道,相比与使用多个线程,避免了线程上下文切换带来的开销。

2、可选择通道(SelectableChannel)

(1)不是所有的Channel都可以被Selector复用的。比如。FileChannel就不能被选择器复用。判断一个Channel能否被Selector复用,需要判断是否继承了一个抽象类SelectableChannel。如果继承了SelectableChannel,则可以被复用,否则不能。

(2)SelectableChannel类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。所有socket通道,都继承了SelectableChannel类,都是可选择的,包括从管道(Pipe)对象中获得的通道。而FileChannel类,没有继承SelectableChannel,因此不是可选通道。

(3)一个通道可以被注册到多个选择器上,但对于每个选择器而言只能被注册一次。通道和选择器之间的关系,使用注册的方式完成。SelectableChannel可以被注册到Selector对象上,在注册的时候,需要指定通道的哪些操作,是Selector感兴趣的。

selector-channel

3、Channel注册到Selector

(1)使用Channel.register(Selector sel, int ops)方法,将一个通道注册到一个选择器时。第一个参数为通道要注册到的选择器,第二个参数为指定选择器需要查询的通道操作(即选择器感兴趣的操作)

(2)可以供选择器查询的通道操作,从类型来分,包括以下四种:

  • 可读:SelectionKey.OP_READ
  • 可写:SelectionKey.OP_WRITE
  • 连接:SelectionKey.OP_CONNECT
  • 接收:SelectionKey.OP_ACCEPT

如果Selector对通道的多操作类型感兴趣,可以用“位或”操作符来实现:

如:int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE

(3)选择器查询的不是通道的操作,而是通道的某个操作的一种就绪状态。

操作的就绪状态:指通道具备完成某个操作的条件,表示该通道的某个操作已经就绪,就可以被Selector查询到,程序可以对通道进行对应的操作。

比如:

SocketChannel通道可以连接到一个服务器,则处于“连接就绪状态(OP_CONNECT)”。

ServerSockecChannel通道准备好接收新进入的连接,则处于“接收就绪状态(OP_ACCEPT)”。

一个有数据可读的通道,则为“读就绪(OP_READ)”。一个等待写数据的通道,则为“写就绪(OP_WRITE)”。

4、选择键(SelectionKey)

(1)Channel注册到Selector后,并且一旦通道处于某种就绪的状态,就可以被Selector查询到。使用选择器Selector的select()方法可以查询到就绪状态的通道,并对感兴趣的通道进行操作。

(2)Selector可以不断的查询Channel中发生的操作的就绪状态。并且挑选感兴趣的操作就绪状态。一旦通道有操作的就绪状态达成,并且是Selector感兴趣的操作,就会被Selector选中,放入选择键集合中。

(3)一个选择键,首先是包含了注册在Selector的通道操作的类型,如:SelectionKey.OP_READ。也包含了特定的通道与特定的选择器之间的注册关系。根据对应的选择键,进行不同的业务逻辑进行处理。

(4)一个选择键类似于监听器模式中的一个事件。由于Selector不是事件触发的模式,而是主动去查询的模式,所以不叫事件,而是叫SelectionKey选择键。

5.2 Selector的使用方法

1、Selector的创建

// 创建selector选择器
Selector selector = Selector.open();

2、注册Channel到Selector

// 创建selector选择器
Selector selector = Selector.open();

// 创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 配置通道非阻塞
serverSocketChannel.configureBlocking(false);

// 绑定连接
serverSocketChannel.bind(new InetSocketAddress(9999));

// 将通道注册到Selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

注:

(1)与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出异常IllegalBlockingModeException。这意味着,FileChannel不能与Selector一起使用,因为FileChannel不能切换到非阻塞模式,而Socket相关的所有通道都可以。

(2)一个通道,并没有一定要支持所有的四中操作。如:ServerSocketChannel支持Accept接受操作,而SocketChannel则不支持。通过调用通道的validOps()方法,来获取特定通道下所有支持的操作集合。

3、轮询查询就绪操作

(1)通过Selector的select()方法,可以查询处已经就绪的通道操作,这些就绪的状态的集合,包含在一个SelectionKey对象的Set集合中。

(2)下面为Selector几个重载的查询select()方法;

  • select():阻塞到至少有一个通道在你注册的时间上就绪了
  • select(long timeout):最长阻塞时间为timeout毫秒
  • selectNow():非阻塞,只要有通道就绪就立刻返回。

select()方法返回类型为int,代表有多少通道已经就绪(自前一次select()方法以来,到这一次select()方法之间的时间段上,有多少通道变成就绪状态)。

例如:首次调用 select()方法,如果有一个通道变成就绪状态,返回了 1,若再次调用
select()方法,如果另一个通道就绪了,它会再次返回 1。如果对第一个就绪的
channel 没有做任何操作,现在就有两个就绪的通道,但在每次 select()方法调用之间,
只有一个通道就绪了。

一旦调用select()方法,且返回值不为0时,在Selector中有一个selectedKey()方法,用来访问已选择键的集合,迭代集合的每一个选择键元素,根据就绪操作的类型,完成对应的操作:

// 查询已经就绪通道操作
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 判断key的就绪状态操作
    if (key.isAcceptable()) {
        // a connection was accepted by a SeverSocketChannel
    } else if (key.isConnectable()) {
        // a connection was established with a remote server
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is read for writing
    }
    iterator.remove();
}

4、停止选择的方法

选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,可以使用以下方法唤醒在select()方法中阻塞的线程。

wakeup()方法:通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回,该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。

close()方法:通过close()方法关闭Selector。该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。

5.3 Channel注册到Selector及业务操作步骤

  1. 创建Selector选择器
  2. 创建ServerSocketChannel,绑定监听端口
  3. 设置channel为非阻塞
  4. 注册channel到Selector
  5. 调用Selector的select()方法(循环调用),监测通道的就绪状况
  6. 调用selectedKeys()方法获取已就绪的channel通道集合
  7. 遍历就绪channel集合,判断就绪事件类型,实现具体业务操作
  8. 根据业务,决定是否需要再次注册监听时间,重复执行第三步操作

以下为一个Selector的demo

 // 1.获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.切换到非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3.创建buffer
ByteBuffer serverByteBuffer = ByteBuffer.allocate(1024);
// 4.绑定监听端口号
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",8080));
// 5.获取Selector选择器
Selector selector = Selector.open();
// 6.通道注册到选择器,进行监听
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 7.选择器进行轮询操作
while(selector.select() > 0) {
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    // 迭代器遍历
    Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
    while (selectionKeyIterator.hasNext()) {
        // 获取就绪操作
        SelectionKey next = selectionKeyIterator.next();
        // 判断是什么操作
        if (next.isAcceptable()) {
            // 获取连接
            SocketChannel socketChannel = serverSocketChannel.accept();
            // 切换为非阻塞模式
            socketChannel.configureBlocking(false);
            // 注册
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else if (next.isReadable()) {
            SocketChannel channel = (SocketChannel) next.channel();
            ByteBuffer byteReadBuffer = ByteBuffer.allocate(1024);

            // 读取数据
            int length = 0;
            while((length = channel.read(byteReadBuffer)) > 0) {
                byteReadBuffer.flip();
                System.out.println(new String(byteReadBuffer.array(),0, length));
                byteReadBuffer.clear();
            }
        }
    }
}

第六章 Java NIO(Pipe和FileLock)

6.1 Pipe

Java NIO管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据将会被写到sink通道,从source通道读取。

pipe

1、创建管道

// 1.获取管道
Pipe pipe = Pipe.open()

2、写入管道

// 2.获取sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
// 3.创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
// 4.写入数据
sinkChannel.write(byteBuffer);

3、从管道读取数据

// 5.获取source通道
Pipe.SourceChannel sourceChannel = pipe.source();
// 6.创建缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024);
// 7.读取数据 read()方法返回的int值代表多少数据读入了缓冲区
int length = sourceChannel.read(byteBuffer1);
System.out.println(new String(byteBuffer1.array(), 0, length));

Pipe demo

// 1.获取管道
Pipe pipe = Pipe.open();
// 2.获取sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
// 3.创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
// 4.写入数据
sinkChannel.write(byteBuffer);
// 5.获取source通道
Pipe.SourceChannel sourceChannel = pipe.source();
// 6.创建缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024);
// 7.读取数据
int length = sourceChannel.read(byteBuffer1);
System.out.println(new String(byteBuffer1.array(), 0, length));
// 8.关闭通道
sourceChannel.close();
sinkChannel.close();

6.2 FileLock

1、FileLock简介

FileLock即文件锁,给文件加锁,同一时间,只能有一个程序修改此文件,或者程序都只能读此文件,解决了多个程序同时访问、修改文件数据不同步出现的问题。

文件锁是进程级别的,不是线程级别的。文件锁可以解决多个进程并发访问、修改同一个文件的问题,但不能结局多线程并发访问、修改同一文件的问题。使用文件锁时,同一进程内的多个线程,仍然可以同时访问、修改此文件。

文件锁是当前程序所述的JVM实例持有的,一旦获取到文件锁(对文件加锁),要调用release(),或者关闭对应的FileChannel对象,或者当前JVM退出,才会释放这个锁。

一旦某个进程(如JVM实例)对某个文件加锁,则在释放这个锁之前,此进程不能再对此文件加锁,即JVM实例在同一文件上的文件锁是不重叠的(进程级别不能再同一文件上获取锁)

2、文件锁分类

排它锁:又叫独占锁。对文件加排它锁后,该进程可以对此文件进行读写,该进程独
占此文件,其他进程不能读写此文件,直到该进程释放文件锁。

共享锁:某个进程对文件加共享锁,其他进程也可以访问此文件,但这些进程都只能
读此文件,不能写。线程是安全的。只要还有一个进程持有共享锁,此文件就只能读,
不能写

3、文件锁 demo

public static void main(String[] args) throws IOException {

    String str = "test fileLock";
    System.out.println(String.format("输入: %s", str));
    ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));

    // 创建Path对象
    String filePath = "E:\\java\\nio\\test\\test.txt";
    Path path = Paths.get(filePath);
    // 创建FileChannel
    FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.APPEND);
    fileChannel.position(fileChannel.size() - 1);

    // 获取锁方法一:lock() 阻塞方法,当文件锁不可用时,当前进程会被挂起
    // 独占锁
    // fileChannel.lock();
    // 共享锁
    FileLock lock = fileChannel.lock(0, Long.MAX_VALUE, true);
    System.out.println("共享锁shared: " + lock.isShared());

    fileChannel.write(byteBuffer);
    // 关闭当前FileChannel对象,释放文件锁
    // lock.release();
    fileChannel.close();
    System.out.println("写操作完成");
    // 读取数据
    readPrint(filePath);
}

private static void readPrint(String filePath) throws IOException {
    FileReader fileReader = new FileReader(filePath);
    BufferedReader bufferedReader = new BufferedReader(fileReader);
    String tr = bufferedReader.readLine();
    System.out.println("读取内容");
    while(tr != null) {
        System.out.println("   " + tr);
        tr = bufferedReader.readLine();
    }
    fileReader.close();
    bufferedReader.close();
}

4、获取文件锁方法

有 4 种获取文件锁的方法:

  • lock() //对整个文件加锁,默认为排它锁。
  • lock(long position, long size, booean shared) //自定义加锁方式。前 2 个参数
    指定要加锁的部分(可以只对此文件的部分内容加锁),第三个参数值指定是否是共
    享锁。
  • tryLock() //对整个文件加锁,默认为排它锁。
  • tryLock(long position, long size, booean shared) //自定义加锁方式。
    如果指定为共享锁,则其它进程可读此文件,所有进程均不能写此文件,如果某进程
    试图对此文件进行写操作,会抛出异常

5、lock与tryLock的区别

lock是阻塞式的,如果未获取到文件锁,会一直阻塞当前线程,直到获取到文件锁。

tryLock和lock的作用相同,只不过tryLock是非阻塞式的,tryLock是尝试获取文件锁,获取成功就返回锁对象,否则返回null,不会阻塞当前线程。

6、FileLock两个方法

  • boolean isShared() // 此文件锁是否是共享锁
  • boolean isValid() // 此文件锁是否还有效

在某些OS上,对某个文件加锁后,不能够对此文件使用通道映射。

第七章 Java NIO(其他部分)

7.1 Path

1、Path简介

Java Path 接口是 Java NIO 更新的一部分,同 Java NIO 一起已经包括在 Java6 和
Java7 中。Java Path 接口是在 Java7 中添加到 Java NIO 的。Path 接口位于
java.nio.file 包中,所以 Path 接口的完全限定名称为 java.nio.file.Path。

Java Path 实例表示文件系统中的路径。一个路径可以指向一个文件或一个目录。路径
可以是绝对路径,也可以是相对路径。绝对路径包含从文件系统的根目录到它指向的
文件或目录的完整路径。相对路径包含相对于其他路径的文件或目录的路径。

在许多方面,java.nio.file.Path 接口类似于 java.io.File 类,但是有一些差别。不过,
在许多情况下,可以使用 Path 接口来替换 File 类的使用。

2、创建Path实例

使用 java.nio.file.Path 实例必须创建一个 Path 实例。可以使用 Paths 类
(java.nio.file.Paths)中的静态方法 Paths.get()来创建路径实例。

// 创建绝对路径
Path path = Paths.get("E:\\java\\nio\\..\\test\\test.txt");

3、创建绝对路径

(1)创建绝对路径,通过调用Paths.get()方法,给定绝对路径文件作为参数来完成。

Path path = Paths.get("E:\java\nio\test\test.txt");

(2)如果在Linux、MacOS等操作系统上,绝对路径如下

Path path = Paths.get("/home/test/nio/test.txt");

(3)如果在Windows机器上使用了从/开始的路径,那么路径将被解释为相对于当前驱动器

4、创建相对路径

Java NIO Path 类也可以用于处理相对路径。使用 Paths.get(basePath, relativePath)方法创建一个相对路径

Path path = Paths.get("E:\\java\\nio", "\\test\\test.txt");
System.out.println(path); // 输出结果为E:\java\nio\test\test.txt

5、路径标准化

通过调用Path.normalize()方法可以使路径标准化。标准化意味着它将移除所有在路径字符串的中间的.和…代码,并解析路径字符串所引用的路径。

// 创建绝对路径
Path path = Paths.get("E:\\java\\nio\\..\\test\\test.txt");
Path normalizePath = path.normalize();
System.out.println(normalizePath);	// 输出结果为E:\java\test\test.txt

7.2 Files

Java NIO Files 类(java.nio.file.Files)提供了几种操作文件系统中的文件的方法。以下
内容介绍 Java NIO Files 最常用的一些方法。java.nio.file.Files 类与java.nio.file.Path 实例一起工作,因此在学习 Files 类之前,需要先了解 Path 类。

1、创建新目录

  • Files.createDirectories(Path path)
  • Files.createDirectory(Path path)

使用Files.createDirectories(Path path)方法和Files.createDirectory(Path path)都可以创建目录,两者的区别在于,如果Path的路径中目标目录的父目录不存在,前者会一并创建父目录,后者则会抛出NoSuchFileException异常,如果目标文件已存在,两个方法都会抛出FileAlreadyExistsException异常

String strPath = "E:\\java\\nio\\test\\test.txt";
Path path = Paths.get(strPath);
// 会创建父级目录
try {
    Path directories = Files.createDirectories(path);

    // createDirectory
    // 如果没有父级目录,则抛出异常
    Path directory = Files.createDirectory(path);
    System.out.println(directory);
} catch (NoSuchFileException e) {
    System.out.println(e.getMessage());
} catch(FileAlreadyExistsException e) {
    System.out.println(e.getMessage());
}

2、复制文件

  • Files.copy(Path source, Path target, CopyOption… options)

source:源文件

target:目标文件

options:标准复制选项,此参数有三种:REPLACE_EXISTING(替换现有文件),COPY_ATTRIBUTES(复制到新文件),ATOMIC_MOVE(将文件作为原子文件系统操作进行移动)

使用**Files.copy()**方法从一个路径拷贝一个文件到另外一个目录

String strPath = "E:\\java\\nio\\test\\test.txt";
Path path = Paths.get(strPath);
Path copyPath = Paths.get("E:\\java\\nio\\test\\copy\\copyTest.txt");
Files.copy(path, copyPath, StandardCopyOption.REPLACE_EXISTING);

3、移动文件

  • Files.move(Path source, Path target, CopyOption… options)

用于将文件从一个路径移动到另一个路径。移动文件与文件重命名的操作都是用此方法。

参数意义同复制文件。

String strPath = "E:\\java\\nio\\test\\test.txt";
Path path = Paths.get(strPath);
Path movePath = Paths.get("E:\\java\\nio\\test\\move\\moveTest.txt");
Files.copy(path, movePath, StandardCopyOption.REPLACE_EXISTING);

4、删除文件

  • Files.delete(Path path)

参数为要删除文件的Path,如果对应的目标文件不存在,则会抛出NoSuchFileException

Path deletePath = Paths.get("E:\\java\\nio\\test\\copy\\copyTest.txt");
Files.delete(deletePath);

5、遍历目录树

  • Files.walkFileTree(Path start, FileVisitor<? super Path> visitor)

(1)Files.walkFileTree()方法包含递归遍历目录树功能,将 Path 实例和 FileVisitor
作为参数。Path 实例指向要遍历的目录,FileVisitor 在遍历期间被调用。

(2)FileVisitor 是一个接口,必须自己实现 FileVisitor 接口,并将实现的实例传递给
walkFileTree()方法。在目录遍历过程中,您的 FileVisitor 实现的每个方法都将被调用。如果不需要实现所有这些方法,那么可以扩展 SimpleFileVisitor 类,它包含FileVisitor 接口中所有方法的默认实现。

(3)FileVisitor 接口的方法中,每个都返回一个 FileVisitResult 枚举实例。
FileVisitResult 枚举包含以下四个选项:

  • CONTINUE 继续
  • TERMINATE 终止
  • SKIP_SIBLING 跳过同级
  • SKIP_SUBTREE 跳过子级

遍历目录树demo:

// walkFileTree
Path parentPath = Paths.get("E:\\java\\nio");
String fileToFind = "moveTest.txt";
Files.walkFileTree(parentPath, new SimpleFileVisitor<Path>() {
    @Override
    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws
        IOException {
        String fileString = file.toAbsolutePath().toString();
        //System.out.println("pathString = " + fileString);
        if (fileString.endsWith(fileToFind)) {
            System.out.println("file found at path: " + file.toAbsolutePath());
            return FileVisitResult.TERMINATE;
        }
        return FileVisitResult.CONTINUE;
    }
});

7.3 AsynchronousFileChannel

在 Java 7 中,Java NIO 中添加了 AsynchronousFileChannel,也就是是异步地将数
据写入文件。

1、创建AsynchronousFileChannel

通过静态方法open()创建

Path path = Paths.get("E:\\java\\nio\\test\\test.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
    e.printStackTrace();
}

open()方法的第一个参数指向与AsynchronousFileChannel相关联文件的Path实例。

第二个参数是一个或多个打开选项,它告诉AsynchronousFileChannel在文件上执行什么操作。

2、通过Future读取数据

通过以下两种方式可以从AsynchronousFileChannel读取数据。第一种方式是调用返回read()方法返回Future对象

Path path = Paths.get("E:\\java\\nio\\test\\test.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
long position = 0;
// 通过Future读取数据
Future<Integer> operation = fileChannel.read(byteBuffer, position);

while(!operation.isDone());

byteBuffer.flip();
byte[] data = new byte[byteBuffer.limit()];
byteBuffer.get(data);
System.out.println(new String(data));
byteBuffer.clear();

(1)创建了一个 AsynchronousFileChannel
(2)创建一个 ByteBuffer,它被传递给 read()方法作为参数,以及一个 0 的位置。
(3)在调用 read()之后,循环,直到返回的 isDone()方法返回 true。
(4)读取操作完成后,数据读取到 ByteBuffer 中,然后打印到 System.out

3、通过CompletionHandler读取数据

通过调用read()方法时,将CompletionHandler作为参数传入

Path path = Paths.get("E:\\java\\nio\\test\\test.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
long position = 0;
fileChannel.read(byteBuffer, position, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("result = " + result);
        attachment.flip();
        byte[] data = new byte[attachment.limit()];
        attachment.get(data);
        System.out.println(new String(data));
        attachment.clear();
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("error");
    }
});

(1)读取操作完成,将调用 CompletionHandler completed()方法。
(2)对于 completed()方法的参数传递一个整数,它告诉我们读取了多少字节,以及传递给 read()方法的“附件”。“附件”是 read()方法的第三个参数。在本代码中,它是 ByteBuffer,数据也被读取。
(3)如果读取操作失败,则将调用 CompletionHandlerfailed()方法。

4、通过Future写数据

Path path = Paths.get("E:\\java\\nio\\test\\test.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
long position = 0
    
byteBuffer.put("test future write".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
Future<Integer> write = fileChannel.write(byteBuffer, position);
byteBuffer.clear();
while(!write.isDone());
System.out.println("write over");

首先,AsynchronousFileChannel 以写模式打开。然后创建一个 ByteBuffer,并将
一些数据写入其中。然后,ByteBuffer 中的数据被写入到文件中。最后,示例检查返
回的 Future,以查看写操作完成时的情况。
注: 文件必须已经存在。如果该文件不存在,那么 write()方法将抛出一个
java.nio.file.NoSuchFileException

5、通过CompletionHandler写数据

Path path = Paths.get("E:\\java\\nio\\test\\test.txt");
AsynchronousFileChannel fileChannel = null;
try {
    fileChannel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
} catch (IOException e) {
    e.printStackTrace();
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
long position = 0
    
byteBuffer.put("test CompletionHandler write".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
fileChannel.write(byteBuffer, position, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        System.out.println("bytes has been written");
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.out.println("written failed");
    }
});

当写操作完成时,将会调用 CompletionHandler 的completed()方法。如果写失败,
则会调用failed()方法。

7.4 字符集(Charset)

Java中使用Charset来表示字符集编码对象

Charset常用静态方法

public static Charset forName(String charsetName)//通过编码类型获得 Charset 对象
public static SortedMap<String,Charset> availableCharsets()//获得系统支持的所有编码方式
public static Charset defaultCharset()//获得虚拟机默认的编码方式
public static boolean isSupported(String charsetName)//判断是否支持该编码类型

Charset常用普通方法

public final String name()//获得 Charset 对象的编码类型(String)
public abstract CharsetEncoder newEncoder()//获得编码器对象
public abstract CharsetDecoder newDecoder()//获得解码器对象

字符集demo

 Charset charset = Charset.forName("UTF-8");
// 1.获取编码器
CharsetEncoder charsetEncoder = charset.newEncoder();
// 2.获取解码器
CharsetDecoder charsetDecoder = charset.newDecoder();
// 3.获取需要解码的数据
CharBuffer charBuffer = CharBuffer.allocate(1024);
charBuffer.put("字符集编码测试");
charBuffer.flip();

// 4.编码
ByteBuffer byteBuffer = charsetEncoder.encode(charBuffer);
System.out.println("编码后: ");
for (int i = 0; i < byteBuffer.limit(); i++) {
    System.out.print(byteBuffer.get() + " ");
}
System.out.println();

byteBuffer.flip();
// 5.解码
CharBuffer charBufferDecode = charsetDecoder.decode(byteBuffer);
System.out.println("解码后: ");
System.out.println(charBufferDecode);
System.out.println("指定其他格式解码: ");
Charset charset1 = Charset.forName("GBK");
byteBuffer.flip();
CharBuffer charBuffer1 = charset1.decode(byteBuffer);
System.out.println(charBuffer1);

// 6.获取Charset所支持的字符编码
SortedMap<String, Charset> map = Charset.availableCharsets();
map.forEach((k,v) -> {
  System.out.println("key: " + k + ", value: " + v);
});

注:jdk1.7以后部分编码格式可以使用StandardCharsets类来进行上述的编码解码操作

例:根据UTF-8进行编码

ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode("test");
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);

第八章 NIO 小案例

服务端代码

ChatServer类:

public class ChatServer {

    // 启动服务器
    private void startServer() throws IOException {
        // 创建selector
        Selector selector = Selector.open();
        // 创建channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(8000));
        // 配置非阻塞
        serverSocketChannel.configureBlocking(false);
        // 注册到selector
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器启动成功,等待连接中....");
        // 等待连接接入
        for (;;) {
            // 获取channel数量
            int readChannels = selector.select();

            System.out.println("等待连接接入");
            if (readChannels == 0) {
                continue;
            }

            // 从selector获取可用的channel, 所获取的为等待IO操作(及有事件发生)channel的selectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍历selection集合
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            if (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 操作channel, 操作了channel之后,此channel不该留在selector中, 将其移除
                iterator.remove();
                // 如果为接收连接状态
                if (selectionKey.isAcceptable()) {
                    acceptOperator(serverSocketChannel, selector);
                }
                // 如果为读取状态
                if (selectionKey.isReadable()) {
                    readOperator(selector, selectionKey);
                }
            }
        }
    }

    // 类型为读取状态的操作
    private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
        // 获取channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // 读取客户端消息
        int readLength = socketChannel.read(byteBuffer);
        String msg = "";
        if (readLength > 0) {
            // 切换读模式
            byteBuffer.flip();

            msg += StandardCharsets.UTF_8.decode(byteBuffer);
        }
        // 将channel再次注册到selector上,可读状态
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 将消息广播到其他客户端
        if (msg.length() > 0) {
            System.out.println(msg);
            castOtherClient(msg, selector, socketChannel);
        }
    }

    // 将消息广播到其他客户端
    private void castOtherClient(String msg, Selector selector, SocketChannel socketChannel) throws IOException {
        // 获取所有channel
        Set<SelectionKey> selectionKeySet = selector.keys();
        // 广播消息
        for (SelectionKey selectionKey : selectionKeySet) {
            Channel targetChannel = selectionKey.channel();
            // 不需要给自己发送
            if (targetChannel instanceof SocketChannel && socketChannel != targetChannel) {
                ((SocketChannel) targetChannel).write(StandardCharsets.UTF_8.encode(msg));
            }
        }

    }

    // 类型为接入状态的操作
    private void acceptOperator(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
        SocketChannel socketChannel = serverSocketChannel.accept();
        // 配置非阻塞
        socketChannel.configureBlocking(false);
        // 注册进selector,进行下一步的操作
        socketChannel.register(selector, SelectionKey.OP_READ);
        // 回复客户端提示信息
        socketChannel.write(StandardCharsets.UTF_8.encode("欢迎进入聊天室,请开始你们的聊天吧!"));
    }

    public static void main(String[] args) {
        try {
            new ChatServer().startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客户端代码

ChatClient类:

public class ChatClient {

    // 启动客户端
    public void startClient(String name) throws IOException {
        // 连接服务端
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8000));
        // 接收服务端响应的数据
        Selector selector = Selector.open();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        new Thread(new ClientThread(selector)).start();

        // 向服务端发送消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            if (msg.length() > 0) {
                socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + msg));
            }
        }
    }
}

ClientThread类:

public class ClientThread implements Runnable{

    private Selector selector;

    public ClientThread(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            for (;;) {
                int readChannels = selector.select();
                if (readChannels == 0) {
                    continue;
                }
                // 遍历选择器
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                // 获取对应的channel
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    // 获取channel
                    SelectionKey selectionKey = iterator.next();
                    // 从集合中移除
                    iterator.remove();
                    // 如果为可读状态
                    if (selectionKey.isReadable()) {
                        readOperator(selector, selectionKey);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 类型为读取状态的操作
    private void readOperator(Selector selector, SelectionKey selectionKey) throws IOException {
        // 获取channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 创建buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        // 读取客户端消息
        int readLength = socketChannel.read(byteBuffer);
        String msg = "";
        if (readLength > 0) {
            // 切换读模式
            byteBuffer.flip();

            msg += StandardCharsets.UTF_8.decode(byteBuffer);
        }
        // 将channel再次注册到selector上,可读状态
        socketChannel.register(selector, SelectionKey.OP_READ);
        if (msg.length() > 0) {
            System.out.println(msg);
        }
    }
}

AClient类:

public class AClient {

    public static void main(String[] args) {
        try {
            new ChatClient().startClient("lucy");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

BClient类:

public class BClient {
    public static void main(String[] args) {
        try {
            new ChatClient().startClient("david");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

文章作者: 琉璃夜空
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 琉璃夜空 !
  目录