NIO与网络编程——AIO的使用

PunkLu 2020年01月08日 67次浏览
AIO的使用

AIO的使用

AsynchronousFileChannel类的使用

AsynchronousFileChannel类用于读取、写入和操作文件的异步通道。在通过调用此类定义的open()方法打开文件时,将创建一个异步文件通道。该文件包含可读写的、可查询其大小的可变长度的字节序列。当写入字节超出其当前大小时,文件的大小会增加。文件的大小在截断时会减小。

获取此通道文件的独占锁

public final Future lock()方法的作用是获取此通道文件的独占锁。此方法启动一个操作以获取此通道的文件的独占锁。该方法返回一个表示操作挂起结果的Future对象。Future的get()方法在成功完成时返回FileLock。返回值表示待定结果的Future对象。

示例代码1:

public class TestLock1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        Future<FileLock> future = channel.lock();
        FileLock lock = future.get();
        System.out.println("A get lock time=" + System.currentTimeMillis());
        Thread.sleep(8000); // 留出一点时间,用来启动TestLock2类
        lock.release();
        System.out.println("A release lock time=" + System.currentTimeMillis());
        channel.close();

    }
}

示例代码2:

public class TestLock2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        System.out.println("lock begin " + System.currentTimeMillis());
        Future<FileLock> future = channel.lock();
        System.out.println("lock end " + System.currentTimeMillis());
        FileLock lock = future.get();
        System.out.println("B get lock time= " + System.currentTimeMillis());
        lock.release();
        channel.close();
    }
}

首先运行TestLock1,然后运行TestLock2.可以看到,在TestLock1中释放了独占锁后,TestLock2才成功获取到独占锁。这个方法可以用于保证单个线程在某个时间点上对于文件的独占锁定。

获取通道文件给定区域的锁

public abstract Future lock(long position,long size,boolean shared)方法的作用是获取此通道文件给定区域的锁。此方法启动一个操作以获取此信道文件的给定区域的锁。该方法的行为与lock(long,long,boolean,Object,CompletionHandler)方法完全相同,不同之处在于,此方法不指定CompletionHandler程序,而是返回一个表示待定结果的Future对象。Future的get()方法在成功完成时返回FileLock。

参数position代表锁定区域的起始位置,必须为非负数。size代表锁定区域的大小,必须是非负数。shared值为true代表请求的是共享锁,在这种情况下,此通道必须为读取(并可能写入)打开,如果请求排他锁,在这种情况下,此通道必须为写入而打开(并且可能读取)。返回值代表待定结果的Future对象。

示例代码1:

public class TestLockPartition1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        Future<FileLock> future = channel.lock(0,3,false);
        FileLock lock = future.get();
        System.out.println("A get lock time= " + System.currentTimeMillis());
        Thread.sleep(8000); // 留出一些时间,用来启动TestLockPartition2类
        lock.release();
        System.out.println("A release lock time=" + System.currentTimeMillis());
        channel.close();
    }
}

示例代码2:

public class TestLockPartition2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        System.out.println("B lock begin " + System.currentTimeMillis());
        Future<FileLock> future = channel.lock(0,3,false);
        System.out.println("B lock end " + System.currentTimeMillis());
        FileLock lock  = future.get();
        System.out.println("B get lock time=" + System.currentTimeMillis());
        lock.release();
        channel.close();
    }
}

示例代码3:

public class TestLockPartition3 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        System.out.println("C lock begin " + System.currentTimeMillis());
        Future<FileLock> future = channel.lock(4,4,false);
        System.out.println("C lock end " + System.currentTimeMillis());
        FileLock lock = future.get();
        System.out.println("C get lock time=" + System.currentTimeMillis());
        lock.release();
        channel.close();
    }
}

可以看出TestLockPartition1和TestLockPartition2锁定的都是范围内的前3个数据区域,而TestLockPartition3锁定的是第4至第7个数据。所以,当首先运行TestLockPartition1,再运行TestLockPartition2时,会出现阻塞,TestLockPartition1需要等到TestLockPartition2释放掉锁后才可以获取到锁并进行下面的锁,但是TestLockPartition1和TestLockPartition3可以同时获取到锁。

在两个进程对同一个文件的锁定范围有重叠时,会出现阻塞的状态。

返回此通道文件当前大小与通道打开状态

public abstract long size()方法的作用是返回此通道文件的当前大小。

public boolean isOpen()方法的作用是判断通道是否呈打开的状态。

a.txt文件的内容如下:12345

示例代码:

public class TestSizeAndIsOpen {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        System.out.println("File size=" + channel.size());
        System.out.println("A isOpen=" + channel.isOpen());
        channel.close();
        System.out.println("B isOpen=" + channel.isOpen());
    }
}

运行结果:

File size=5
A isOpen=true
B isOpen=false

CompletionHandler接口的使用

public final void lock(A attachment,CompletionHandler<FileLock,? super A> handler)方法的作用是获取此通道文件的独占锁。此方法启动一个操作以获取此通道文件的给定区域的锁。handler参数是在获取锁(或操作失败)时调用的CompletionHandler对象。传递给CompletionHandler的结果是生成的FileLock。

参数A代表附件的数据类型。参数attachment代表要附加到IO操作的对象,可以为空。CompletionHandler代表处理程序,用于消耗结果的处理程序。

示例代码:

public class TestCompletionHandler {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        System.out.println("begin time=" + System.currentTimeMillis());
        channel.lock("我是附加值", new CompletionHandler<FileLock, String>() {
            @Override
            public void completed(FileLock result, String attachment) {
                try{
                    System.out.println("public void completed(FileLock result,String attachment)attachment=" + attachment);
                    System.out.println("completed time=" + System.currentTimeMillis());
                    result.release();
                    channel.close();
                    System.out.println("release end close");
                }catch (IOException e){
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println("public void failed(Throwable exc,String attachment) attachment=" + attachment);
                System.out.println("getMessage=" + exc.getMessage());
            }
        });
        System.out.println("   end time=" + System.currentTimeMillis());
        Thread.sleep(3000);
    }
}

运行结果:

begin time=1578457643056
   end time=1578457643064
public void completed(FileLock result,String attachment)attachment=我是附加值
completed time=1578457643065
release end close

从运行结果看,是将程序运行完之后,再运行CompletionHandler中的completed方法。begin和end的时间非常接近,几乎是相同的时间,这就是异步的优势。

而CompletionHandler中的public void failed(Throwable exc,A attachment)方法被调用的时机是出现I/O操作异常时。

failed()方法的示例代码为:

public class TestFailed {

    public static void main(String[] args) throws IOException,InterruptedException {
        Path path = Paths.get("c:\\abc\\abc.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE,StandardOpenOption.READ);
        channel.close();
        channel.lock("我是字符串我是附件", new CompletionHandler<FileLock, String>() {
            @Override
            public void completed(FileLock result, String attachment) {
                try {
                    result.release();
                }catch (IOException e){
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println("public void failed(Throwable exc,String attachment)");
                System.out.println("attachment=" + attachment + " exc.getMessage()=" + exc.getMessage());
                System.out.println("exc.getClass().getName()=" + exc.getClass().getName());
            }
        });
        Thread.sleep(3000);
    }
}

可以看到,因为在lock()方法执行前先执行了channel.close()方法关闭了通道,所以出现了异常,调用了failed()方法。

此外还有:

  1. 执行指定范围的锁定与传入附件及整合接口的方法public abstract void lock(long position,long size,boolean shared,A attachment,CompletionHandler<FileLock,? super A> handler)
  2. 执行锁定与传入附件及整合接口的方法public final void lock(A attachment,CompletionHandler<FileLock,? super A> handler)

这两个方法的使用方法与之前介绍的大同小异,不再赘述。

读取数据方式1

public abstract Future read(ByteBuffer dst,long position)方法的作用是从给定的文件位置开始,从该通道将字节序列读入给定的缓冲区。此方法从给定的文件位置开始,将从该通道的字节序列读取到给定的缓冲区。此方法返回Future对象。如果给定位置大于或等于在尝试读取时文件的大小,则Future的get()方法将返回读取的字节数或-1。参数dst代表要将字节传输到的缓冲区。参数position代表AsynchronousFileChannel代表的文件开始读取的位置,必须是非负数。

示例代码:

public class TestRead1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {

        // a.txt的文件内容为12345
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(3);
        Future<Integer> future = channel.read(buffer,0);
        System.out.println("length=" + future.get());
        channel.close();
        byte[] byteArray = buffer.array();
        for (int i = 0; i < byteArray.length; i++) {
            System.out.print((char)byteArray[i]);
        }
    }
}

读取数据方式2

除了上面说的正常读取文件内容,也可以使用public abstract Future read(ByteBuffer dst,long position,A attachment,CompletionHandler<FileLock,? super A> handler)的方式向目标缓冲区中添加自定义的内容。

示例代码:

public class TestRead2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {

        // a.txt的文件内容为12345
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(3);
        channel.read(buffer, 0, "我是附加的参数", new CompletionHandler<Integer, String>() {
            @Override
            public void completed(Integer result, String attachment) {
                System.out.println("public void completed(Integer result,String attachment) result=" + result + " attachment=" + attachment);
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println("public void failed(Throwable exc,String attachment) attachment=" + attachment);
                System.out.println("getMessage=" + exc.getMessage());
            }
        });
        channel.close();
        Thread.sleep(2000);
        byte[] byteArray = buffer.array();
        for (int i = 0; i < byteArray.length; i++) {
            System.out.print((char)byteArray[i]);
        }
    }
}

写入数据方式1

a.txt文件的内容:12345

示例代码:

public class TestWrite1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        // a.txt文件的内容:12345
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        ByteBuffer buffer = ByteBuffer.wrap("abcde".getBytes());
        Future<Integer> future = channel.write(buffer,channel.size());
        System.out.println("length= " + future.get());
        channel.close();
    }
}

运行完成后,a.txt的内容变成12345abcde

写入数据方式2

a.txt内容为12345

示例代码:

public class TestWrite2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        // a.txt文件的内容:12345
        Path path = Paths.get("c:\\abc\\a.txt");
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(path, StandardOpenOption.WRITE);
        ByteBuffer buffer = ByteBuffer.wrap("abcde".getBytes());
        channel.write(buffer, channel.size(), "我是附加的数据", new CompletionHandler<Integer, String>() {
            @Override
            public void completed(Integer result, String attachment) {
                System.out.println("public void completed(Integer result,String attachment) result=" + result + " attachment=" + attachment);
            }

            @Override
            public void failed(Throwable exc, String attachment) {
                System.out.println("public void failed(Throwable exc,String attachment) attachment=" + attachment);
                System.out.println("getMessage=" + exc.getMessage());
            }
        });
        channel.close();
        Thread.sleep(2000);
    }
}

运行完成后,a.txt内容变为12345abcde

异步套接字通道类的使用

AsynchronousServerSocketChannel类是面向流的侦听套接字的异步通道。

AsynchronousSocketChannel类是面向流的连接套接字的异步通道。

使用AsynchronousSocketChannel类的open()方法创建的是未连接状态的AsynchronousSocketChannel对象,之后再使用connect()方法将未连接的AsynchronousSocketChannel变成已连接的AsynchronousSocketChannel对象,详述如下:

  1. 创建AsynchronousSocketChannel是通过调用此类定义的open()方法,新创建的AsynchronousSocketChannel呈已打开但尚未连接的状态。当连接到AsynchronousServerSocketChannel的套接字时,将创建连接的AsynchronousSocketChannel对象。不可能为任意的、预先存在的Socket创建异步套接字通道。
  2. 通过调用connect()方法将未连接的通道变成已连接,连接后该通道保持连接,直到关闭。是否连接套接字通道可以通过调用其getRemoteAddress()方法来确定。尝试在未连接的通道上调用IO操作将导致引发NotYetConnectedExceptionn异常。

如果此类型通道上的一个线程在上一个读操作完成之前启动了read操作,则会引发ReadPendingException异常。类似的,尝试在前一个写操作完成之前启动一个写运算将会引发一个WritePendingException异常。

接受方式1

服务端示例代码:

public class TestAsyncSocketChannelAcceptServer1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8088));
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {

            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                try{
                    serverSocketChannel.accept(null,this);
                    System.out.println("public void completed ThreadName=" + Thread.currentThread().getName());
                    ByteBuffer buffer = ByteBuffer.allocate(20);
                    Future<Integer> readFuture = result.read(buffer);
                    System.out.println(new String(buffer.array(),0,readFuture.get()));
                    result.close();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }catch (ExecutionException e){
                    e.printStackTrace();
                }catch (IOException e){
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("public void failed");
            }
        });

        // 保证程序一直运行,一直作为服务端监听来自客户端的请求
        while (true){

        }
    }
}

客户端示例代码1:

public class TestAsyncSocketChannelAcceptClient1 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        Socket socket = new Socket("localhost",8088);
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("我来自客户端1".getBytes());
        outputStream.flush();
        outputStream.close();
    }
}

客户端示例代码2:

public class TestAsyncSocketChannelAcceptClient2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8088), null, new CompletionHandler<Void, Object>() {

            @Override
            public void completed(Void result, Object attachment) {
                try {
                    Future<Integer> writeFuture = socketChannel.write(ByteBuffer.wrap("我来自客户端2".getBytes()));
                    System.out.println("写入大小:" + writeFuture.get());
                    socketChannel.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }

            @Override
            public void failed(Throwable exc, Object attachment) {

            }
        });
        Thread.sleep(1000);
    }
}

首先运行TestAsyncSocketChannelAcceptServer1,然后分别运行TestAsyncSocketChannelAcceptClient1和TestAsyncSocketChannelAcceptClient2。运行结果:

public void completed ThreadName=Thread-9
我来自客户端1
public void completed ThreadName=Thread-9
我来自客户端2

从运行结果可以看出,除了main主线程外,还有一个Thread-9线程执行了complted方法

接受方式2

服务端示例代码:

public class TestAsyncSocketChannelAcceptServer2 {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8088));
        System.out.println("A " + System.currentTimeMillis());
        Future<AsynchronousSocketChannel> socketChannelFuture =serverSocketChannel.accept();
        System.out.println("B " + System.currentTimeMillis());
        AsynchronousSocketChannel socketChannel = socketChannelFuture.get();
        System.out.println("C " + System.currentTimeMillis());
        ByteBuffer buffer = ByteBuffer.allocate(20);
        System.out.println("D " + System.currentTimeMillis());
        Future<Integer> readFuture = socketChannel.read(buffer);
        System.out.println("E " + System.currentTimeMillis());
        System.out.println(new String(buffer.array(),0,readFuture.get()));

        System.out.println("F " + System.currentTimeMillis());
        Thread.sleep(40000);
    }
}

首先运行TestAsyncSocketChannelAcceptServer2服务端类,然后运行之前编写的TestAsyncSocketChannelAcceptClient1,运行结果:

A 1577972707936
B 1577972707937
C 1577972712821
D 1577972712821
E 1577972712822
我来自客户端1
F 1577972712822

再次重新运行TestAsyncSocketChannelAcceptServer2,然后运行之前编写的TestAsyncSocketChannelAcceptClient2,运行结果:

A 1577972758315
B 1577972758317
C 1577972784128
D 1577972784128
E 1577972784129
我来自客户端2
F 1577972784129

测试读与重复写出现异常

示例服务端代码为:

public class TestMultiReadExceptionServer {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8088));
        Future<AsynchronousSocketChannel> socketChannelFuture = serverSocketChannel.accept();
        AsynchronousSocketChannel socketChannel = socketChannelFuture.get();
        ByteBuffer buffer = ByteBuffer.allocate(20);
        Future<Integer> readFuture1 = socketChannel.read(buffer);
        // 加上下面这句代码执行时便不会报错
        // int result = readFuture1.get();
        Future<Integer> readFuture2 = socketChannel.read(buffer);
    }
}

首先运行TestMultiReadExeceptionServer服务端类,然后运行之前编写的TestAsyncSocketChannelAcceptClient1类可以看到,运行结果报错,因为read()方法是非阻塞的,所以执行第一个read()方法后立即继续执行第2个read()方法,但由于第一个read()方法并没有完成读的操作,因为并没有调用future.get()方法,因此出现ReadPendingException异常。如果在两次read()方法中间加上int result = readFuture1.get();再次执行就不会报错了。

重复写也是一样的道理,不再赘述。

读数据

public abstract void read(ByteBuffer dst,long timeout,TimeUnit unit,A attacment,CompletionHandler<Integer,? super A> handler)方法的作用是将此通道中的字节序列读入给定的缓冲区。此方法启动一个异步读取操作,以便将该通道中的字节序列读入给定的缓冲区。handler参数是在读取操作完成或失败时调用的CompletionHandler。传递给complted()方法的结果是读取的字节数,如果无法读取字节,则为-1,因为信道已达到end-of-stream。

如果指定了timeout并且在操作完成之前发生超时的情况,则操作将以异常InterruptedByTimeoutException完成。在发生超时的情况下,实现无法保证字节没有被读取,或者不会从通道读取到给定的缓冲区。

正常传输数据的服务端代码:

public class TestRightReadServer {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8088));
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {

            @Override
            public void completed(AsynchronousSocketChannel ch, Object attachment) {

                // 跳转向下一个accept方法
                serverSocketChannel.accept(null,this);
                ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.MAX_VALUE/100);
                ch.read(byteBuffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {

                    @Override
                    public void completed(Integer result, Object attachment) {
                        if (result == -1){
                            System.out.println("客户端没有传输数据就执行close了,到stream end");
                        }

                        if (result == byteBuffer.limit()){
                            System.out.println("服务端获得客户端完整数据");
                        }

                        try{
                            ch.close();
                            System.out.println("服务端close");
                        }catch (IOException e){
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("read public void failed(Throwable exc,void attachment)");
                        System.out.println("exc getMessage()=" + exc.getClass().getName());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("accept public void failed");
            }
        });

        while (true){

        }
    }
}

正常传输数据的客户端代码:

public class TestRightReadClient {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        final AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8088), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                try {
                    ByteBuffer buffer = ByteBuffer.allocate(Integer.MAX_VALUE/100);
                    for (int i = 0; i < Integer.MAX_VALUE/100 -3 ; i++) {
                        buffer.put("1".getBytes());
                    }
                    buffer.put("end".getBytes());
                    buffer.flip();
                    int writeSum = 0;
                    // 由于write()方法是异步的,所以执行write()方法后
                    // 并不能100%将数据写出,所以得通过writeLength变量
                    // 来判断具体写出多少字节的数据
                    while (writeSum < buffer.limit()){
                        Future<Integer> writeFuture = socketChannel.write(buffer);
                        Integer writeLength = writeFuture.get();
                        writeSum = writeSum + writeLength;
                    }
                    socketChannel.close();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }catch (IOException e){
                    e.printStackTrace();
                }catch (ExecutionException e){
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("connect public void failed(Throwable exc,Void attachment)");
                System.out.println("exc getMessage()=" + exc.getClass().getName());
            }
        });
        Thread.sleep(10000);
    }
}

首先运行TestRightReadServer,然后运行TestRightReadClient,可以看到,客户端的数据正常传输到了服务端。

写数据

public abstract void write(ByteBuffer src,long timeout,TimeUnit unit,A attachment,CompletionHandler<Integer,? super A> handler)方法的作用是从给定缓冲区向此通道写入一个字节序列。此方法启动异步写入操作,以便从给定缓冲区向此通道写入一个字节序列。handler参数是在写操作完成或失败时调用的CompletionHandler。传递给complted()方法的结果是写入的字节数。

如果指定了timeout,并且在操作完成之前发生了超时,则它将以异常InterrutedByTimeoutException完成。如果发生超时,实现无法保证字节尚未写入或不会从给定的缓冲区写入通道。

服务端示例代码:

public class TestRightWriteServer {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8088));
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel ch, Object attachment) {
                serverSocketChannel.accept(null,this);// 继续下一个accept接作,因为accept方法是异步的
                ByteBuffer buffer = ByteBuffer.allocate(Integer.MAX_VALUE/100);
                ch.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        if (result == -1){
                            System.out.println("客户端没有传输数据就执行close了,到stream end");
                        }

                        if (result == buffer.limit()){
                            System.out.println("服务端获得客户端完整数据");
                        }

                        try{
                            ch.close();
                            System.out.println("服务端close");
                        }catch (IOException e){
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("read public void failed(Throwable exc,Void attachment)");
                        System.out.println("exc getMessage()=" + exc.getClass().getName());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("accept public void failed");
            }
        });

        while (true){

        }
    }
}

客户端示例代码:

public class TestRightWriteClient {

    public static void main(String[] args) throws IOException,InterruptedException, ExecutionException {
        final AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 8088), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                ByteBuffer  buffer = ByteBuffer.allocate(Integer.MAX_VALUE / 100);
                for (int i = 0; i < Integer.MAX_VALUE /100 -3; i++) {
                    buffer.put("1".getBytes());
                }
                buffer.put("end".getBytes());
                buffer.flip();
                socketChannel.write(buffer, 1, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        try {
                            socketChannel.close();
                            System.out.println("client close");
                        }catch (IOException e){
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("write public void failed(Throwable exc,Void attachment)");
                        System.out.println("exc getMessage()=" + exc.getClass().getName());
                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("connect public void failed(Throwable exc,Void attachment)");
                System.out.println("exc getMessage()=" + exc.getClass().getName());
            }
        });

        Thread.sleep(5000);
    }
}

首先运行TestRightWriteServer,然后运行TestRightWriteClient,可以看到运行结果:

服务端获得客户端完整数据
服务端close

同步、异步、阻塞与非阻塞的关系

同步、异步、阻塞和非阻塞可以组合成以下四种排列:

  1. 同步阻塞
  2. 同步非阻塞
  3. 异步阻塞
  4. 异步非阻塞

在使用普通的InputStream、OutputStream类时,就是属于同步阻塞,因为执行当前读写任务一直是当前线程,并且读不到或写不出去就一直是阻塞的状态。阻塞的意思就是方法不返回,直到读到数据或写出数据为止。

NIO技术属于同步非阻塞。当执行"serverSocketChannel.configureBlocking(false)"代码后,也是一直由当前的线程在执行读写操作,但是读不到数据或数据写不出去时读写方法就返回了,继续执行读或写后面的代码。

而异步当然就是指多个线程间的通信。例如,A线程发起一个读操作,这个读操作要B线程进行实现,A线程和B线程就是异步执行了。A线程还要继续做其他的事情,这时B线程开始工作,如果读不到数据,B线程就呈阻塞状态了,如果读到数据,就通知A线程,并且将拿到的数据交给A线程,这种情况是异步阻塞。

最后一种情况是异步非阻塞,是指A线程发起一个读操作,这个读操作要B线程进行实现,因为A线程还要继续做其他的事情,这时B线程开始工作,如果读不到数据,B线程就继续执行后面的代码,直到读到数据时,B线程就通知A线程,并且将拿到的数据交给A线程。

从大的概念上来讲,同步和异步关注的是消息通信机制,阻塞和非阻塞关注的是程序在等待调用结果时的状态。文件通道永远都是阻塞的,不能设置成非阻塞模式。