NIO与网络编程——通道和FileChannel类的使用

PunkLu 2020年01月06日 133次浏览
通道和FileChannel类的使用
# 通道和FileChannel的的使用

在NIO技术中,要将操作的数据打包到缓冲区中,而缓冲区中的数据想要传输到目的地是要依赖于通道的,缓冲区是将数据进行打包,而通道是将数据进行传输。

通道就是用来传输数据的通路。NIO技术中的数据要放在缓冲区中进行管理,在使用通道将缓冲区中的数据传输到目的地。

通道概述

缓冲区都是类,而通道都是接口,这是由于通道的实现都是要依赖于操作系统的,Channel接口只定义有哪些功能,而功能的具体实现在不同的操作系统中是不一样的。因此,在JDK中,通道被设计为接口数据类型。

通道接口的层次结构

NIO技术中的通道是一个接口,查看Channel接口的源码可以发现,Channel接口继承自Closeable接口,而Closeable接口继承自AutoCloseable接口,AutoCloseable接口的作用是可以自动关闭,而不需要显式地调用close()方法,示例代码:

public class DBOperate implements AutoCloseable{

    @Override
    public void close() throws Exception {
        System.out.println("关闭连接");
    }
}
public class TestChannelAutoCloseable {

    public static void main(String[] args) {
        // 如果try后的小括号中有多条语句,则最后一条是没有分号的
        // 并且小括号中的变量都要实现AutoCloseable接口
        try (DBOperate dbo = new DBOperate()){
            System.out.println("使用" + dbo + "开始数据库的操作");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行结果:

使用tech.punklu.niosocket.chapter2.DBOperate@1b6d3586开始数据库的操作
关闭连接

DBOPerate类实现了AutoCloseable接口,使DBOperate类具有close()方法自动关闭资源的功能。

通道可处于打开或关闭两种状态,一旦关闭了某个通道,则试图对其调用I/O操作时就会导ClosedChannelException异常被抛出,但可以通过调用通道的isOpen()方法测试通道是否处于打开状态.一般情况下,通道对于多线程的访问是安全的。

在JDK1.8版本中,Channel接口具有11个子接口:

  1. AsynchronousChannel

  2. AsynchronousByteChannel

  3. ReadableByteChannel

  4. ScatteringByteChannel

  5. WritableByteChannel

  6. GatheringByteChannel

  7. ByteChannel

  8. SeekableByteChannel

  9. NetworkChannel

  10. MulticastChannel

  11. InterruptibleChannel

FileChannel类的使用

FileChannel类的主要作用是读取、写入、映射和操作文件的通道.该通道永远是阻塞的操作。

FileChannel类在内部维护当前文件的position,可对其进行查询和修改.该文件本身包含一个可读写、长度可变的字节序列,并且可以查询该文件的当前大小。当写入的字节超出文件的当前大小时,则增加文件的大小;截取该文件时,则减小文件的大小。文件可能还有某个相关联的元数据,如访问权限、内容类型和最后修改的时间,但此类未定义访问元数据的方法。

除了字节通道中常见的读取、写入和关闭操作外,此类还定义了下列特定于文件的操作:

  1. 以不影响通道当前位置的方式,对文件中绝对位置的字节进行读取或写入
  2. 将文件中的某个区域直接映射到内存中.对于较大的文件,这通常比调用普通的read()或write()方法更为高效
  3. 强制对底层存储设备进行文件更新,确保在系统崩溃时不丢失数据
  4. 以一种可被许多操作系统优化为直接向文件系统缓存发送或从中读取的高速传输方法,将字节从文件传输到某个其他通道中,反之亦然
  5. 可以锁定某个文件区域,以阻止其他程序对其进行访问

多个并发线程可安全地使用文件通道。此类没有定义打开现有文件或创建新文件的方法,可从现有的FileInputStream、FileOutoutStream或RandomAccessFile对象获得文件通道,方法是调用该对象的getChannels方法,这会返回一个连接到相同底层文件的文件通道。

写操作与位置的使用

int write(ByteBuffer src)方法的作用是将remaining字节序列从给定的缓冲区写入此通道的当前位置,此方法的行为与WritabeByteChannel接口所指定的行为完全相同:在任意给定时刻,一个可写入通道上只能进行一个写入操作。如果某个线程在通道上发起写入操作,那么在第一个操作完成之前,将阻塞其他所有试图发起另一个写入操作的线程。其他种类的I/O操作是否继续与写入操作并发执行,取决于该通道的类型。该方法的返回值代表写入的字节数,可能为零。

WrirableByteChannel接口的特点:

  1. 将一个ByteBuffer缓冲区中的remaining字节序列写入通道的当前位置
  2. write(ByteBuffer)是同步的

long position()方法的作用是返回此通道的文件位置。

public abstract FileChannel position(long newPosition)方法的作用是设置此通道的文件位置。

示例代码:

public class TestChannelWriteAtCurrentPosition {

    public static void main(String[] args) throws IOException,InterruptedException {
        FileOutputStream fosRef = new FileOutputStream(new File("C:\\abc\\a.txt"));
        FileChannel fileChannel = fosRef.getChannel();
        try {
            ByteBuffer buffer = ByteBuffer.wrap("abcde".getBytes());
            System.out.println("A fileChannel.position()=" + fileChannel.position());
            System.out.println("Write() 1 返回值:" +fileChannel.write(buffer));
            System.out.println("B fileChannel.position()=" + fileChannel.position());

            fileChannel.position(2);
            buffer.rewind(); // 还原buffer的position为0
            // 然后在当前位置position中再进行写入
            System.out.println("write() 2返回值:"+ fileChannel.write(buffer));
            System.out.println("C fileChannel.position()=" + fileChannel.position());
        }catch (IOException e){
            e.printStackTrace();
        }
        fileChannel.close();
        fosRef.close();
    }
}

运行结果:

A fileChannel.position()=0
Write() 1 返回值:5
B fileChannel.position()=5
write() 2返回值:5
C fileChannel.position()=7

运行结果说明:int write(ByteBuffer src)是从当前位置开始写入的。

示例代码:

public class TestWriteRemaining {


    public static void main(String[] args) throws IOException {
        FileOutputStream fosRef = new FileOutputStream(new File("c:\\abc\\b.txt"));
        FileChannel fileChannel = fosRef.getChannel();
        try {
            ByteBuffer buffer1 = ByteBuffer.wrap("abcde".getBytes());
            ByteBuffer buffer2 = ByteBuffer.wrap("12345".getBytes());
            fileChannel.write(buffer1);
            buffer2.position(1);
            buffer2.limit(3);
            fileChannel.position(2);
            fileChannel.write(buffer2);
        } catch (IOException e) {
            e.printStackTrace();
        }
        fileChannel.close();
        fosRef.close();
    }
}

运行输出结果:

ab23e

运行结果说明:int write(ByteBuffer src)方法将ByteBuffer的remaining写入通道。

示例代码:

public class TestWriteSync {

    private static FileOutputStream fosRef;
    private static FileChannel fileChannel;

    public static void main(String[] args) throws IOException,InterruptedException {
        fosRef = new FileOutputStream(new File("c:\\abc\\c.txt"));
        fileChannel = fosRef.getChannel();
        for (int i = 0; i < 10; i++) {
            Thread thread1 = new Thread(){
                @Override
                public void run() {
                    try{
                        ByteBuffer buffer = ByteBuffer.wrap("abcde\r\n".getBytes());
                        fileChannel.write(buffer);
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            };

            Thread thread2 = new Thread(){
                @Override
                public void run() {
                    try{
                        ByteBuffer buffer = ByteBuffer.wrap("我是中国人\r\n".getBytes());
                        fileChannel.write(buffer);
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            };
            thread1.start();
            thread2.start();
        }
        Thread.sleep(3000);
        fileChannel.close();
        fosRef.close();
    }
}

运行结果:

我是中国人
我是中国人
abcde
我是中国人
abcde
我是中国人
abcde
我是中国人
abcde
我是中国人
我是中国人
abcde
abcde
我是中国人
abcde
我是中国人
abcde
abcde
我是中国人
abcde

运行结果说明:int write(ByteBuffer src)方法具有同步特性。

读操作

int read(ByteBuffer dst)方法的作用是将字节序列从此通道的当前位置读入给定的缓冲区的当前位置。此方法的行为与ReadableByteChannel接口中指定的行为完全相同:在任意给定时刻,一个可读取通道上只能进行一个读取操作。如果某个线程在通道上发起读取操作,那么在第一个操作完成之前,将阻塞其他所有试图发起另一个读取操作的线程。其他种类的I/O操作是否继续与读取操作并发执行,取决于该通道的类型。方法的返回值代表读取的字节数,可能为零。如果该通道已到达流的末尾,则返回-1。

ReadableByteChannel接口有以下两个特点:

  1. 将通道当前位置中的字节序列读入一个ByteBuffer缓冲区中的remaining空间中。
  2. read(ByteBuffer)方法是同步的

示例代码:

public class TestReturnValue {

    private static FileInputStream fisRef;
    private static FileChannel fileChannel;

    public static void main(String[] args) throws IOException,InterruptedException {
        fisRef = new FileInputStream(new File("C:\\abc\\b.txt"));
        fileChannel = fisRef.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
        System.out.println(fileChannel.position());
        int readLength = fileChannel.read(byteBuffer);
        System.out.println(readLength); // 取得5个字节

        // 将下面的代码添加注释,那么再次执行read()方法时,
        // 返回值是0,因为byteBuffer没有remaining剩余空间
        //byteBuffer.clear();
        System.out.println(fileChannel.position());
        readLength = fileChannel.read(byteBuffer);
        System.out.println(readLength); // 取得0个字节

        // 执行clear()方法,使缓冲区状态还原
        byteBuffer.clear();
        System.out.println(fileChannel.position());
        readLength = fileChannel.read(byteBuffer);
        System.out.println(readLength); // 到达流的末尾值为-1
        byteBuffer.clear();

        fileChannel.close();
        fisRef.close();

    }
}

int read(ByteBuffer dst)方法返回值的意义:

  1. 正数,代表从通道的当前位置向ByteBuffer缓冲区读的字节个数
  2. 0:代表从通道中没有读取任何的数据,也就是0字节,有可能发生的情况是缓冲区没有剩余空间了
  3. -1:代表到达流的末端

示例代码1:

public class TestFromChannelCurrentPosition {

    private static FileInputStream fisRef;
    private static FileChannel fileChannel;

    public static void main(String[] args) throws IOException,InterruptedException {
        fisRef = new FileInputStream(new File("c:\\abc\\b.txt"));
        fileChannel = fisRef.getChannel();
        fileChannel.position(2);

        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
        fileChannel.read(byteBuffer);

        byte[] getByteArray = byteBuffer.array();
        for (int i = 0; i < getByteArray.length; i++) {
            System.out.print((char)getByteArray[i]);
        }

        fileChannel.close();
        fisRef.close();
    }
}

运行结果:

23e

运行结果说明:int read(ByteBuffer dst)方法是从通道的当前位置开始读取的

示例代码2:

public class TestPutOnBufferCurrentPosition {

    private static FileInputStream fisRef;
    private static FileChannel fileChannel;

    public static void main(String[] args) throws IOException,InterruptedException {
        fisRef = new FileInputStream(new File("c:\\abc\\b.txt"));
        fileChannel = fisRef.getChannel();
        fileChannel.position(2);

        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
        byteBuffer.position(3);

        // 向ByteBuffer读入cd
        fileChannel.read(byteBuffer);

        byte[] getByteArray = byteBuffer.array();

        for (int i = 0; i < getByteArray.length; i++) {
            if (getByteArray[i] == 0){
                System.out.print("空格");
            }else {
                System.out.print((char)getByteArray[i]);
            }
        }

        fileChannel.close();
        fisRef.close();
    }
}

运行结果:

空格空格空格23

运行结果说明:int read(ByteBuffer dst)方法将字节放入ByteBuffer当前位置

示例代码3:

public class TestFileChannelReadSync {

    private static FileInputStream fisRef;
    private static FileChannel fileChannel;

    public static void main(String[] args) throws IOException,InterruptedException {
        fisRef = new FileInputStream(new File("c:\\abc\\d.txt"));
        fileChannel = fisRef.getChannel();

        for (int i = 0; i < 1; i++) {
            Thread thread1 = new Thread(){
                @Override
                public void run() {
                    try {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
                        int readLength = fileChannel.read(byteBuffer);
                        while (readLength != -1){
                            byte[] getByte = byteBuffer.array();
                            System.out.println(new String(getByte,0,readLength));
                            byteBuffer.clear();
                            readLength = fileChannel.read(byteBuffer);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            };

            Thread thread2 = new Thread(){
                @Override
                public void run() {
                    try {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(5);
                        int readLength = fileChannel.read(byteBuffer);
                        while (readLength != -1){
                            byte[] getByte = byteBuffer.array();
                            System.out.println(new String(getByte,0,readLength));
                            byteBuffer.clear();
                            readLength = fileChannel.read(byteBuffer);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            };

            thread1.start();
            thread2.start();
        }
        Thread.sleep(3000);
        fileChannel.close();
        fisRef.close();
    }
}

运行结果说明:

read方法具有同步特性,两个线程一起打印出了文本文件中的内容,利用了通道每次read都是从当前
position开始的特性,两个线程交替输出文本中的内容

示例代码4:

public class TestFileChannelReadRemaining {

    public static void main(String[] args) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(new File("c:\\abc\\a.txt"));
        FileChannel fileChannel = fileInputStream.getChannel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
        byteBuffer.position(1);
        byteBuffer.limit(3);
        fileChannel.read(byteBuffer);
        fileChannel.close();
        fileInputStream.close();

        byteBuffer.rewind();

        for (int i = 0; i < byteBuffer.limit(); i++) {
            byte eachByte = byteBuffer.get();
            if (eachByte == 0){
                System.out.print("空格");
            }else {
                System.out.print((char)eachByte);
            }
        }
    }
}

运行结果:

空格ab

运行结果说明:缓冲区中remaining有多少空间,就从通道中读取多少数据

批量写操作

long write(ByteBuffer[] src)方法的作用是将每个缓冲区中的remaining字节序列写入此通道的当前位置。调用此方法的形式为c.write(srcs),该调用与调用c.write(srcs,0,srcs.length)的形式完全相同。

long write(ByteBuffer[] srcs)方法实现的是GatheringByteChannel接口中的同名方法。接口GatheringByteChannel的父接口是WritableByteChannel,说明接口GatheringByteChannel具有WritableByteChannel接口的以下两个特性。

  1. 将一个ByteBuffer缓冲区中的remaining字节序列写入通道的当前位置中
  2. write(ByteBuffer) 方法是同步的

此外,它还具有第三个特性:将多个ByteBuffer缓冲区中的remaining剩余字节序列写入通道的当前位置中。

批量读操作

long read(ByteBuffer[] dsts)方法的作用是将字节序列从此通道当前位置的字节序列读入给定的缓冲区数组中的每个缓冲区的当前位置。调用此方法的形式为c.read(dsts),该调用与调用c.read(dsts,0,dsts.length)的形式完全相同。

long read(ByteBuffer[] dsts)方法实现的是ScatteringByteBuffer接口中的同名方法,而接口ScatteringByteBuffer的父接口是ReadableByteChannel,说明接口ScatteringByteBuffer具有ReadableByteChnanel接口的以下两个特性:

  1. 将通道当前位置的字节序列读入1个ByteBuffer缓冲区的remaining空间中
  2. read(ByteBuffer)方法是同步的

此外,它还具有第三个特性:将通道当前位置的字节序列读入多个ByteBuffer缓冲区的remaining剩余空间中。

部分批量写操作

long write(ByteBuffer[] srcs,int offset,int length)方法的作用是以指定缓冲区数组的offset下表开始,向后使用length个字节缓冲区,再将这length个字节缓冲区的remaining剩余字节子序列写入此通道的当前位置。

参数的作用说明:

  1. offset:第一个缓冲区(要获取该缓冲区中的字节)在缓冲区数组中的偏移量
  2. length:要访问的最大缓冲区数

long write(ByteBuffer[] srcs,int offset,int length)方法实现的是GatheringByteChannel接口中的同名方法,而接口GatheringByteChannel的父接口是WritableByteChannel,说明接口GatheringByteChannel也具有WritableByteChannel接口的以下两个特性:

  1. 将一个ByteBuffer缓冲区中的remaining字节序列写入通道的当前位置
  2. write(ByteBuffer)方法是同步的

部分批量读操作

long read(ByteBuffer[] dsts,int offset,int length)方法的作用是将通道中当前位置的字节序列写入下标为offset开始的ByteBuffer[]数组中的remaining剩余空间中,并且连续写入length个ByteBuffer缓冲区。

向通道的指定position位置写入数据

write(ByteBuffer src,long position)方法的作用是将缓冲区的remaining剩余字节序列写入通道的指定位置。此方法具有同步特性。

参数src代表要传输其中字节的缓冲区。position代表将要写入的通道的位置,必须为非负数。除了从给定的文件位置开始写入各字节,而不是从该通道的当前位置外,此方法的执行方式与write(ByteBuffer)方法相同,此方法不修改此通道的位置。如果给定的位置大于该通道的当前大小,则该文件将扩大以容纳新的字节,在文件之前的末尾和新写入字节之间的字节值是为指定的。

读取通道指定位置的数据

read(ByteBuffer dst,long position)方法的作用是将通道的指定位置的字节序列读入给定的缓冲区的当前位置。除了从给定的文件位置开始读取各字节,而不是从该通道的当前位置外,此方法的执行方式与read(ByteBuffer)方法相同。此方法不修改此通道的位置。如果给定的位置大于该文件的当前大小,则不读取任何字节。

设置位置与获得大小

position(long newPosition)方法的作用是设置此通道的文件位置。

long size()方法的作用是返回此通道关联的文件的当前大小。

截断缓冲区

truncate(long size)方法的作用是将此通道的文件截取为给定大小。如果给定大小小于该文件的当前大小,则截取该文件,丢弃文件新末尾后面的所有字节。如果给定大小大于或等于该文件的当前大小,则不修改文件。

将数据传输到其他可写入字节通道

long transferTo(position,count,WritableByteChannel dest)方法的作用是将字节从此通道的给定position处开始的count个字节传输到给定的可写入字节通道的当前位置。transferTo()方法的功能相当于write方法,只不过是将通道中的数据传输到另一个通道中,而不是缓冲区中。

示例代码:

/**
 * e本来的值:123456789
 * f本来的值:abcdefg
 *
 * 执行完后,e的值变为123cde789
 */
public class TestTransferTo {

    public static void main(String[] args) throws IOException {
        RandomAccessFile fileA = new RandomAccessFile("c:\\abc\\f.txt","rw");
        RandomAccessFile fileB = new RandomAccessFile("c:\\abc\\e.txt","rw");

        FileChannel fileChannel1 = fileA.getChannel();
        FileChannel fileChannel2 = fileB.getChannel();

        fileChannel2.position(3);
        fileChannel1.transferTo(2,3,fileChannel2);

        fileChannel1.close();
        fileChannel2.close();

        fileA.close();
        fileB.close();
    }
}

从执行结果看,c.transferTo(position,count,WritableByteChannel)是将c中的position处开始的count个字节传输到了WritableByteChannel的当前位置处。此外,如果c从position开始的剩余字节数小于count,则只传输剩余的字节数到WritableByteChannel中。

将字节从给定可读取字节通道传输到此通道的文件中

long transferFrom(ReadableByteChannel src,position,count)方法的作用是将字节从给定的可读取字节通道传输到此通道的文件中。此方法不修改此通道的位置,如果给定的位置大于该文件的当前大小,则不传输任何字节。从源通道中的当前位置开始读取各字节写入到当前通道,然后将src通道的位置增加读取的字节数。

参数说明:

  1. src:源通道
  2. position:当前通道要写入字节的位置,从此位置开始传输
  3. count:要传输的最大字节数

示例代码:

/***
 * g最开始的内容:abcdefg
 * h最开始的内容:123456789
 *
 * 执行完后,g变成abc56fg
 */
public class TestTransferFrom {

    public static void main(String[] args) throws IOException {
        RandomAccessFile fileA = new RandomAccessFile("c:\\abc\\g.txt","rw");
        RandomAccessFile fileB = new RandomAccessFile("c:\\abc\\h.txt","rw");

        FileChannel fileChannelA = fileA.getChannel();

        FileChannel fileChannelB = fileB.getChannel();

        fileChannelB.position(4);

        long readLength = fileChannelA.transferFrom(fileChannelB,3,2);
        System.out.println(readLength);

        fileChannelA.close();
        fileChannelB.close();

        fileA.close();
        fileB.close();
    }
}

判断当前通道是否打开

示例代码:

public class TestChannelIsOpen {

    public static void main(String[] args) throws IOException {
        File file = new File("c:\\abc\\a.txt");
        RandomAccessFile fileA = new RandomAccessFile(file,"rw");
        FileChannel fileChanneA = fileA.getChannel();
        System.out.println(fileChanneA.isOpen());
        fileChanneA.close();
        System.out.println(fileChanneA.isOpen());
    }
}

运行结果:

true
false