NIO与网络编程——Selector选择器的使用

PunkLu 2020年01月08日 51次浏览
Selector选择器的使用
# 选择器的使用

NIO技术中最重要的知识点之一:选择器。选择器结合SelectableChannel实现了非阻塞的效果,大大提高了程序运行的效率。选择器实现了I/O通道的多路复用,使用它可以节省CPU资源,提高程序运行效率。

选择器与I/O多路复用

Selector选择器是NIO技术中的核心组件,可以将通道注册进选择器中,主要作用就是使用一个线程来对多个通道中的已就绪通道进行选择,然后就可以对选择的通道进行数据处理,属于一对多的关系,也就是使用一个线程来操作多个通道,这种机制在NIO技术中称为“I/O多路复用”。它的优势是可以节省CPU资源,因为只有一个线程,CPU不需要在不同的线程间进行非常耗时的上下文切换。

在JDK的源码中,创建线程的个数是根据通道的数量来决定的,每注册1023个通道就创建一个新的线程,这些线程执行windows中的select()方法来监测系统Socket的事件,如果发生事件则通知应用层中的main线程终止阻塞,继续向下执行,处理事件。可以在CMD中使用jps和jstack来查看创建线程的数量。

使用I/O多路复用时,这个线程不是以for循环的方式来判断每个通道是否有数据要进行处理,而是以操作系统底层作为“通知器”,来“通知JVM中的进程”哪个通道中的数据需要进行处理。不使用for循环的方式来进行判断,而是使用通知的方式,可以大大提高程序运行的效率,不会出现无限期的for循环迭代空运行。

核心类Selector、SelectionKey和SelectableChannel的关系

使用选择器技术时,主要由3个对象以合作的方式来实现线程选择某个通道进行业务处理,这三个对象分别是Selector、SelectionKey和SelectableChannel的关系。Selector类是抽象类,它是SelectableChannel对象的多路复用器。也就是说只有SelectableChannel通道对象才能被Selector选择器所复用,因为只有SelectableChannel类才具有register(Selector sel,int ops)方法,该方法的作用是将当前的SelectableChannel通道注册到指定的选择器中。由选择器来决定对哪个通道中的数据进行处理,这些能被选择器处理的通道的父类就是SelectableChannel,它是抽象类。SelectableChannel类和FileChannel是平级关系,都继承自父类AbstractInterruptibleChannel。抽象类SelectableChannel有很多子类。

SelectionKey的作用是一个标识,代表SelectableChannel类已经向Selector类注册了。

通道类AbstractInterruptibleChannel与接口InterruptibleChannel的介绍

AbstractInterruptibleChannel实现了InterruptibleChannel接口,该接口的主要作用,是使通道能以异步的方式进行关闭和中断。如果通道实现了asynchronously和closable特性,那么,当一个线程在一个能被中断的通道上出现了阻塞状态,其他线程调用这个通道的close()方法时,这个呈阻塞状态的线程将接收到AsynchronousCloseException异常。

如果通道实现了asynchronously和closeable,并且还实现了interruptible特性,那么当一个线程在一个能被中断的通道上出现了阻塞状态,其他线程调用这个阻塞线程的interrupt()方法时,通道将被关闭,这个阻塞的线程将接收到ClosedByInterruptException异常,这个阻塞线程的状态一直是中断状态。

ServerSocketChannel类、Selector和SelectionKey的使用

通过使用ServerSocketChannel类、Selector和SelectionKey,来实现ServerSocketChannel结合Selelctor达到I/O多路复用的目的。

获得ServerSocketChannel与ServerSocket socket对象

ServerSocketChannel类是抽象的,因此,并不能直接new实例化,但API中提供了public static ServerSocketChannel open()方法来创建ServerSocketChannel类的实例。作用是打开服务器套接字通道。新通道的套接字最初是未绑定的,可以接受连接之前,必须通过它的某个套接字的bind()方法将其绑定到具体的地址。

创建完实例后,可以调用它的public abstract ServerSocket socket()方法返回ServerSocket对象,然后与客户端套接字进行通信。

public final void close()方法的作用是关闭此通道。如果已关闭,则立即返回。否则,它会将该通道标记为已关闭,然后调用implCloseChannel()方法以完成关闭操作。

示例代码:

public class TestServerSocketChannel {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress("localhost",8888));
        Socket socket = serverSocket.accept();
        InputStream inputStream = socket.getInputStream();
        InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
        char[] charArray =  new char[1024];
        int readLength = inputStreamReader.read(charArray);
        while (readLength != -1){
            String newString = new String(charArray,0,readLength);
            System.out.println(newString);
            readLength = inputStreamReader.read(charArray);
        }
        inputStreamReader.close();
        inputStream.close();
        socket.close();
        serverSocket.close();
        serverSocketChannel.close();
    }
}

执行绑定操作

上文使用如下代码:

serverSocket.bind(new InetSocketAddress("localhost",8888));

将ServerSocket类绑定到指定的地址,而ServerSocketChannel类也有bind()方法,该方法的作用是将通道的套接字绑定到本地地址并侦听连接,如果使用该方法绑定了地址,则serverSocket不需要再重新绑定。

阻塞与非阻塞以及accept()方法的使用效果

public abstract SocketChannel accept()方法的作用是接受此通道套接字的连接。如果此通道处于非阻塞模式,那么在不存在挂起的连接时,此方法将直接返回null。否则,在新的连接可用或发生I/O错误之前会无限期地阻塞它。无论此通道地阻塞模式如何,此方法返回的套接字通道(如果有)将处于阻塞模式。

public final SelectableChannel configureBlocking(boolean block)方法的作用是调整此通道的阻塞模式,传入true是阻塞模式,传入false是非阻塞模式。

阻塞特性的示例代码:

public class TestServerSocketChannelBlock {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        System.out.println(serverSocketChannel.isBlocking());
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        System.out.println("begin " + System.currentTimeMillis());
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println(" end " + System.currentTimeMillis());
        socketChannel.close();
        serverSocketChannel.close();
    }
}

运行结果:

true
begin 1578410260873

从输出的结果看,输出了begin没有输出end,说明accept()方法呈阻塞状态。

非阻塞特性的示例代码:

public class TestServerSocketChannelNotBlock {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        System.out.println(serverSocketChannel.isBlocking());
        serverSocketChannel.configureBlocking(false);
        System.out.println(serverSocketChannel.isBlocking());
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        System.out.println("begin " + System.currentTimeMillis());
        SocketChannel socketChannel = serverSocketChannel.accept();
        System.out.println(" end " + System.currentTimeMillis() + " socketChannel=" +socketChannel);
        socketChannel.close();
        serverSocketChannel.close();
    }
}

运行结果:

true
false
begin 1578410322098
 end 1578410322099 socketChannel=null
Exception in thread "main" java.lang.NullPointerException
	at tech.punklu.niosocket.chapter5.TestServerSocketChannelNotBlock.main(TestServerSocketChannelNotBlock.java:28)

在非阻塞模式下,accept()方法在没有客户端连接时,返回null值。

使用accept()方法结合ByteBuffer获取数据的服务端示例代码:

public class TestRecDataWithByteBufferServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        SocketChannel socketChannel = serverSocketChannel.accept();
        ByteBuffer byteBuffer = ByteBuffer.allocate(2);
        int readLength = socketChannel.read(byteBuffer);
        while (readLength != -1){
            String newString = new String(byteBuffer.array());
            System.out.print(newString);
            byteBuffer.flip();
            readLength = socketChannel.read(byteBuffer);
        }
        socketChannel.close();
        serverSocketChannel.close();
    }

}

使用accept()方法结合ByteBuffer获取数据的客户端示例代码:

public class TestRecDataWithByteBufferClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("Transfer Data Client".getBytes());
        outputStream.close();
        socket.close();
    }
}

使用ServerSocketChannel类的accept()方法的优势是返回一个SocketChannel通道,此通道是SelectableChannel(可选择通道)的子类,可以把这个SocketChannel通道注册到选择器中实现I/O多路复用,另外,SocketChannel通道使用缓存区进行数据的读取操作。

获得Selector对象

Selector类是抽象的,因此不能直接实例化,需要调用open()方法获得Selector对象。Selector类的public static Selector open()方法的作用是打开一个选择器,使SelectableChannel能将自身注册到这个选择器上。

获得Selectorl类实例的示例代码:

public class TestGetSelector {

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        System.out.println(selector);
    }

}

执行注册操作与获得SelectionKey对象

SelectableChannel类的public final SelectionKey register(Selector sel,int ops)方法的作用是向给定的选择器注册此通道,返回一个选择键(SelectionKey)。

参数sel代表要向其注册此通道的选择器,参数ops代表register()方法的返回值SelectionKey的可用操作集,操作集是在SelectionKey类中以常量的形式进行提供的,如下所示:

  1. static int OP_ACCEPT

    用于套接字接受操作的操作集位

  2. static int OP_CONNECT

    用于套接字连接操作的操作集位

  3. static int OP_READ

    用于读取操作的操作集位

  4. static int OP_WRITE

    用于写入操作的操作集位

示例代码:

public class TestRegister {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 必须将ServerSocketChannel设置成非阻塞的,不然会出现java.nio.channels.IllegalBlockingModeException异常
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress("localhost",8888));

        // 核心代码-开始
        Selector selector = Selector.open();
        SelectionKey key = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        // 核心代码-结束
        System.out.println("selector=" +selector );
        System.out.println("key=" +key );
        serverSocket.close();
        serverSocketChannel.close();
    }
}

判断注册的状态

SelectableChannel类的public final boolean isRegisted()方法的作用是判断此通道当前是否已向任何选择器进行了注册。新创建的通道总是未注册的。由于对SelectionKey执行取消操作和通道进行注销之间有延迟,因此在已取消某个通道的所有SelectionKey后,该通道可能在一定时间内还会保持已注册状态。关闭通道后,该通道可能在一定时间内还会保持已注册状态。

示例代码:

public class TestIsRegisted {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress("localhost",8888));

        System.out.println("A isRegisted=" + serverSocketChannel.isRegistered());

        Selector selector = Selector.open();
        SelectionKey key = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        System.out.println("B isRegisted=" + serverSocketChannel.isRegistered());

        serverSocket.close();
        serverSocketChannel.close();
    }
}

将通道设置成非阻塞模式再注册到选择器

在将通道注册到选择器之前,必须将通道设置为非阻塞模式。否则会报错。

示例代码:

public class TestNotBlockBeforeRegister {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));

        Selector selector = Selector.open();
        System.out.println("selector=" + selector);

        System.out.println("A serverSocketChannel.isRegistered=" + serverSocketChannel.isRegistered());

        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        System.out.println("A serverSocketChannel.isRegistered=" + serverSocketChannel.isRegistered());

        serverSocketChannel.close();
    }
}

可以看到,运行结果报错,因为未设置为非阻塞的。

使用configureBlocking(false)方法解决异常

将通道设置为非阻塞模式可以使用ServerSocketChannel的configureBlocking方法。可在任意时间调用此方法。新的阻塞方式仅影响在此方法返回后发起的操作。对于某些实现,这可能需要阻塞,直到所有挂起的I/O操作已完成。如果调用此方法的同时正在进行另一个此方法或register()方法的调用,则在另一个操作完成前将首先阻塞该调用。

public final boolean isBlocing()方法的作用是判断此通道上的每个I/O操作在完成前是否被阻塞。

示例代码:

public class TestConfigureBlocking {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));

        // ServerSocketChannel通道默认是阻塞的
        System.out.println("A isBlocking=" + serverSocketChannel.isBlocking());
        serverSocketChannel.configureBlocking(false); // 设置成非阻塞模式
        System.out.println("A isBlocking=" + serverSocketChannel.isBlocking());

        Selector selector = Selector.open();
        System.out.println("selector=" + selector);

        System.out.println("A serverSocketChannel.isRegistered()=" + serverSocketChannel.isRegistered());

        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        System.out.println("B serverSocketChannel.isRegistered()=" + serverSocketChannel.isRegistered());

        serverSocketChannel.close();

    }
}

判断打开的状态

public final boolean isOpen()方法的作用是判断此通道是否处于打开状态

示例代码为:

public class TestIsOpen {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        System.out.println("A serverSocketChannel.isOpen()=" + serverSocketChannel.isOpen());

        serverSocketChannel.close();
        System.out.println("B serverSocketChannel.isOpen()=" + serverSocketChannel.isOpen());

        serverSocketChannel = ServerSocketChannel.open();
        System.out.println("C serverSocketChannel.isOpen()=" + serverSocketChannel.isOpen());
        serverSocketChannel.close();
    }
}

获得SocketAddress对象

public abstract SocketAddress getLocalAddress()方法的作用是获取绑定的SocketAddress对象。

示例代码为:

public class TestGetLocalAddress {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));

        InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
        System.out.println(address.getHostString());
        System.out.println(address.getPort());
        serverSocketChannel.close();
    }
}

阻塞模式的判断

public final boolean isBlocking()方法的作用是判断此通道上的每个I/O操作在完成前是否被阻塞。新创建的通道总是处于阻塞模式。

如果此通道已关闭,则此方法返回的值是未指定的。

根据Selector找到对应的SelectionKey

public final SelectionKey keyFor(Selector sel) 方法的作用是获取通道向给定选择器注册到SelectionKey。

同一个SelectableChannel通道可以注册到不同的选择器对象,然后返回新创建的SelectionKey对象,可以使用public final SelectionKey keyFor(Selector sel)方法来取得当前通道注册在指定选择器上的SelectionKey对象。

示例代码:

public class TestKeyFor {


    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel.configureBlocking(false);

        Selector selector = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("A=" + selectionKey1 +" "+ selectionKey1.hashCode());

        SelectionKey selectionKey2 = serverSocketChannel.keyFor(selector);
        System.out.println("B=" + selectionKey2 + " " + selectionKey2.hashCode());
        serverSocketChannel.close();
    }
}

执行Connect连接操作

public abstract boolean connect(SocketAddress remote)方法的作用是连接到远程通道的Socket。如果此通道处于非阻塞模式,则此方法的调用将启动非阻塞连接操作。如果通道呈阻塞模式,则立即发起连接。如果通道呈非阻塞模式,而不是立即发起连接,而是在随后的某个时间才发起连接。如果连接是立即建立的,说明通道是阻塞模式,当连接成功时,则此方法返回true,连接失败出现异常。如果此通道处于阻塞模式,则此方法的调用将会阻塞,直到建立连接或发生I/O错误。如果连接不是立即建立的,说明通道是非阻塞模式,则此方法返回false,并且以后必须通过调用finishConnect()方法来验证连接是否完成。

默认阻塞连接的服务端示例代码:

public class TestConnectServer {


    public static void main(String[] args) throws IOException,InterruptedException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.close();
        serverSocketChannel.close();
        System.out.println("server end!");
    }
}

默认阻塞连接的客户端示例代码:

public class TestConnectClient {

    public static void main(String[] args) {
        long beginTime = 0;
        long endTime = 0;
        boolean connectResult = false;
        try {
            // SocketChannel是阻塞模式
            // 在发生错误或连接到目标之前,connect()方法一直是阻塞的
            SocketChannel socketChannel = SocketChannel.open();
            beginTime = System.currentTimeMillis();
            connectResult = socketChannel.connect(new InetSocketAddress("localhost",8888));
            endTime = System.currentTimeMillis();
            System.out.println("正常连接耗时:" + (endTime - beginTime) + " connectResult=" + connectResult);
        }catch (IOException e){
            e.printStackTrace();
            endTime = System.currentTimeMillis();
            System.out.println("异常连接耗时: " + (endTime - beginTime) + " connectResult="  + connectResult);
        }
    }
}

只启动TestConnectClient不启动TestConnectServer会报拒绝连接的错。

使用非阻塞模式连接的客户端示例代码:

public class TestConnectClient2 {

    public static void main(String[] args) throws IOException,InterruptedException {
        long beginTime = 0;
        long endTime = 0;
        SocketChannel socketChannel = SocketChannel.open();

        // SocketChannel是非阻塞模式
        socketChannel.configureBlocking(false);
        beginTime = System.currentTimeMillis();
        boolean connectResult = socketChannel.connect(new InetSocketAddress("localhost",8888));
        endTime = System.currentTimeMillis();
        System.out.println("连接耗时:" + (endTime - beginTime) + " connectResult=" + connectResult);
        Thread.sleep(10000);
        socketChannel.close();
    }
}

只启动TestConnectClient2不启动TestConnectServer会报拒绝连接的错。

判断此通道上是否正在进行连接操作

public abstract boolean isConnectionPending()方法的作用是判断此通道上是否正在进行连接操作。返回值是true代表当且仅当已在此发起连接操作,但是尚未通过调用finishConnect()完成连接。还可以是在通道accept()之后和通道close()之前,isConnectionPending()方法的返回值都是true

验证:首先创建服务器端代码:

public class TestIsConnectionPendingServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8088));
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.close();
        serverSocketChannel.close();
        System.out.println("server end!");
    }
}
  1. 阻塞通道,IP不存在的客户端代码:

    public class TestIsConnectingPendingClient1 {
    
        public static void main(String[] args) {
            SocketChannel socketChannel = null;
            try {
                socketChannel = SocketChannel.open();
                System.out.println(socketChannel.isConnectionPending());
                // 192.168.0.123不存在
                socketChannel.connect(new InetSocketAddress("192.168.0.123",8088));
                socketChannel.close();
            }catch (IOException e){
                e.printStackTrace();
                System.out.println("catch " + socketChannel.isConnectionPending());
            }
        }
    }
    

    只运行客户端的运行结果:

    false
    java.net.ConnectException: Connection timed out: connect
    	at sun.nio.ch.Net.connect0(Native Method)
    	at sun.nio.ch.Net.connect(Net.java:454)
    	at sun.nio.ch.Net.connect(Net.java:446)
    	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
    	at tech.punklu.niosocket.chapter5.TestIsConnectingPendingClient1.main(TestIsConnectingPendingClient1.java:28)
    catch false
    
  2. 非阻塞通道,IP不存在的客户端代码:

    public class TestIsConnectingPendingClient2 {
    
        public static void main(String[] args) throws IOException {
            // 非阻塞,IP不存在
            SocketChannel socketChannel = null;
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            System.out.println(socketChannel.isConnectionPending());
            // 192.168.0.123 此ip不存在
            socketChannel.connect(new InetSocketAddress("192.168.0.123",8088));
            System.out.println(socketChannel.isConnectionPending());
            socketChannel.close();
        }
    }
    

    只运行客户端的运行结果:

    false
    true
    

    最后输出值为true,说明非阻塞通道正在建立连接

  3. 阻塞通道,IP存在的客户端代码:

    public class TestIsConnectingPendingClient3 {
    
        public static void main(String[] args) throws IOException {
            // 阻塞,IP存在
            SocketChannel socketChannel = null;
            socketChannel = SocketChannel.open();
            System.out.println(socketChannel.isConnectionPending());
            socketChannel.connect(new InetSocketAddress("localhost",8088));
            System.out.println(socketChannel.isConnectionPending());
            socketChannel.close();
        }
    }
    

    先运行TestIsConnectingPendingServer,再运行此客户端类,运行结果:

    false
    false
    

    最后输出值为false,说明阻塞通道并没有正在建立连接

  4. 非阻塞通道,IP存在的客户端代码:

    public class TestIsConnectingPendingClient4 {
    
        public static void main(String[] args) throws IOException {
            // 非阻塞,IP存在
            SocketChannel socketChannel = null;
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            System.out.println(socketChannel.isConnectionPending());
            socketChannel.connect(new InetSocketAddress("localhost",8888));
            System.out.println(socketChannel.isConnectionPending());
            socketChannel.close();
        }
    }
    

    先运行TestIsConnectingPendingServer,再运行此类,运行结果:

    false
    true
    

    最后输出值为true,说明非阻塞通道正在建立连接

完成套接字通道的连接过程

public abstract boolean finishConnect()方法的作用是完成套接字通道的连接过程。通过将套接字通道置于非阻塞模式,然后调用其connect()方法来发起非阻塞连接操作。如果连接操作失败,则调用此方法将导致抛出IOException。

示例代码为:

public class TestFinishConnectClient {

    public static void main(String[] args) throws IOException,InterruptedException {
        long beginTime = 0;
        long endTime = 0;
        SocketChannel socketChannel = SocketChannel.open();
        // SocketChannel是非阻塞模式
        socketChannel.configureBlocking(false);
        // connect()方法的返回值表示如果建立了连接,则为true
        // 如果此通道处于非阻塞模式且连接操作正在进行中,则为false
        boolean connectResult = socketChannel.connect(new InetSocketAddress("localhost",8088));
        if (connectResult == false){
            System.out.println("connectResult == false");
            while (!socketChannel.finishConnect()){
                System.out.println("一直在尝试连接");
            }
        }
        socketChannel.close();
    }
}

单独运行此程序后的运行结果:

connectResult == false
一直在尝试连接
一直在尝试连接
一直在尝试连接
一直在尝试连接
Exception in thread "main" java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at tech.punklu.niosocket.chapter5.TestFinishConnectClient.main(TestFinishConnectClient.java:35)

客户端一直在使用finishConnect()方法判断连接是否成功,最终检测出客户端连接服务端失败,出现异常。

然后是客户端成功连接服务端的示例代码:

public class TestFinishConnectSuccClient {

    public static void main(String[] args) throws IOException,InterruptedException {
        long beginTime = 0;
        long endTime = 0;
        SocketChannel socketChannel = SocketChannel.open();
        // SocketChannel是非阻塞模式
        socketChannel.configureBlocking(false);
        boolean connectResult = socketChannel.connect(new InetSocketAddress("localhost",8088));
        Thread t = new Thread(){
            @Override
            public void run() {
                try{
                    Thread.sleep(50);
                    ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
                    serverSocketChannel1.bind(new InetSocketAddress("localhost",8088));
                    SocketChannel socketChannel1 = serverSocketChannel1.accept();
                    socketChannel.close();
                    serverSocketChannel1.close();
                    System.out.println("server end!");
                }catch (IOException e){
                    e.printStackTrace();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        };
        t.start();
        if (connectResult == false){
            System.out.println("connectResult == false");
            while (!socketChannel.finishConnect()){
                System.out.println("一直在尝试连接");
            }
        }
        socketChannel.close();
    }
}

从运行结果可以看出,尝试连接成功后,socketChannel.finishConnect()结果为true,停止输出"一直在尝试连接"。

Selector类的作用

Selector类的主要作用是作为SelectableChannel对象的多路复用器。

验证public abstract int select()方法具有阻塞性

public abstract int select()方法的作用是选择一组键,其相应的通道已为I/O操作准备就绪。此方法执行处于阻塞模式的选择操作。仅在至少选择一个通道,调用此选择器的wakeup()方法,或者当前的线程已中断(以先到者为准)后,此方法才返回。返回值代表添加到就绪操作集的键的数目,该数目可能为零,为零代表就绪操作集中的内容并没有添加新的键,保持内容不变。

验证select()方法具有阻塞性的服务端示例代码:

public class TestSelectBlockServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        System.out.println("1");
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        System.out.println("2");
        serverSocketChannel.configureBlocking(false);
        System.out.println("3");
        Selector selector = Selector.open();
        System.out.println("4");
        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("5");
        int keyCount = selector.select();
        System.out.println("6 keyCount= " + keyCount);
        serverSocketChannel.close();
        System.out.println("7 End");
    }
}

验证select()方法具有阻塞性的客户端示例代码:

public class TestSelectBlockClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        socket.close();
    }
}

先执行TestSelectBlockServer再执行TestSelectBlockClient,可以看到,在执行了TestSelectBlockClient后才打印了6 keyCount= 1和7 End,说明Select.select()方法具有阻塞性。

select()方法不阻塞后继续向下执行,进程结束,但随后客户端也连接不到服务端了,因为服务端进程已经销毁。在大多数情况下,服务端的进行并不需要销毁,因此,就要使用while(true)无限循环来无限地接受客户端的请求。但在这个过程中,有可能出现select()方法不出现阻塞的情况,造成的结果就是真正地出现“死循环”了。

select()方法不阻塞的原因和解决方法

在某些情况下,select()方法是不阻塞的。

select()方法不阻塞的服务端代码:

public class TestSelectNotBlockServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel.configureBlocking(false);
        Selector selector1 = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT);
        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector1.select();
            Set<SelectionKey> set1 = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys();
            System.out.println("keyCount=" + keyCount);
            System.out.println("set1 size=" + set1.size());
            System.out.println("set2 size=" + set2.size());
            System.out.println();
        }
        serverSocketChannel.close();
    }
}

select()方法不阻塞的客户端代码:

public class TestSelectNotBlockClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        socket.close();
    }
}

首先运行TestSelectNotBlockServer,然后运行TestSelectNotBlockClient,控制台将出现死循环,出现死循环的原因是客户端连接服务端时,服务端中的通道对accept事件并未处理,导致accept事件一直存在,也就是select()方法一直检测到有准备好的通道要对accept事件进行处理,但一直未处理,就一直呈“死循环”输出的状态了。解决“死循环”的办法是将accept事件消化处理。

解决死循环的服务端代码:

public class TestConfigueSelectBlockServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel.configureBlocking(false);
        Selector selector1 = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel.register(selector1,SelectionKey.OP_ACCEPT);
        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector1.select();
            Set<SelectionKey> set1 = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys();
            System.out.println("keyCount=" + keyCount);
            System.out.println("set1 size=" + set1.size());
            System.out.println("set2 size=" + set2.size());
            System.out.println();
            Iterator<SelectionKey> iterator = set2.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                channel.accept(); // 使用accept()方法将事件处理掉
            }
        }
        serverSocketChannel.close();
    }

}

可以看到这个服务端代码与之前的TestSelectNotBlockServer不同的是,在while循环中增加了accept()方法,对选择器中注册的通道进行了处理。首先运行新建的TestConfigueSelectBlockServer,然后运行之前编写的TestSelectNotBlockClient。可以看到,死循环已经不再出现了。

出现重复消费的情况

如果两个不同的通道注册到相同的选择器,那么极易出现重复消费的情况。

重复消费的服务端示例代码:

public class TestRepeatConsumeServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",7777));
        serverSocketChannel1.configureBlocking(false);

        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        serverSocketChannel2.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel2.configureBlocking(false);

        Selector selector1 = Selector.open();

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1,SelectionKey.OP_ACCEPT);
        SelectionKey selectionKey2 = serverSocketChannel2.register(selector1,SelectionKey.OP_ACCEPT);

        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector1.select();
            Set<SelectionKey> set1 = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys();
            System.out.println("keyCount = " + keyCount);
            System.out.println("set1 size = " + set1.size());
            System.out.println("set2 size = " + set2.size());
            Iterator<SelectionKey> iterator = set2.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel == null){
                    System.out.println("打印这条信息证明是连接8888服务器时,重复消费的情况发生,");
                    System.out.println("将7777关联的SelectionKey对应的SocketChannel通道取出来,");
                    System.out.println("但是值为null,socketChannel == null。");
                }
                InetSocketAddress ipAddress = (InetSocketAddress)serverSocketChannel.getLocalAddress();
                System.out.println(ipAddress.getPort() + " 被客户端连接了!");
                System.out.println();
            }
        }
        serverSocketChannel1.close();
        serverSocketChannel2.close();
    }
}

重复消费的客户端示例代码1:

public class TestRepeatConsumeClient1 {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",7777);
        socket.close();
    }
}

重复消费的客户端示例代码2:

public class TestRepeatConsumeClient2 {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        socket.close();
    }
}

首先运行TestRepeatConsumeServer,然后运行TestRepeatConsumeClient1,再运行TestRepeatConsumeClient2.从TestRepeatConsumeServer控制台的输出结果可以看出,对消息进行了重复消费.这是因为TestRepeatConsumeServer中的set2变量在每一次循环中使用的是底层提供的同一个对象,一直在往set2里面添加已就绪的SelectionKey,一个是关联7777端口的SelectionKey,另一个是关联8888端口的SelectionKey.在这期间,从未从set2中删除Selectionkey,因此,set2的size值为2,再使用while(iterator.hasNext())对set2循环两次,就导致了重复消费.解决重复消费问题的方法就是使用remove()方法删除set2中处理过后的SelectionKey。

使用remove()方法解决重复消费问题

解决重复消费问题的服务端代码:

public class TestRemoveServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",7777));
        serverSocketChannel1.configureBlocking(false);

        ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
        serverSocketChannel2.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel2.configureBlocking(false);

        Selector selector1 = Selector.open();

        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1,SelectionKey.OP_ACCEPT);
        SelectionKey selectionKey2 = serverSocketChannel2.register(selector1,SelectionKey.OP_ACCEPT);

        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector1.select();
            Set<SelectionKey> set1 = selector1.keys();
            Set<SelectionKey> set2 = selector1.selectedKeys();
            System.out.println("keyCount= " + keyCount);
            System.out.println("Set1 size= " + set1.size());
            System.out.println("Set2 size=" + set2.size());
            Iterator<SelectionKey> iterator = set2.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                channel.accept();
                InetSocketAddress ipAddress = (InetSocketAddress) channel.getLocalAddress();
                System.out.println(ipAddress.getPort() + "被客户端连接了");
                System.out.println();
                iterator.remove();// 删除当前的SelectionKey
            }
        }
        serverSocketChannel1.close();
        serverSocketChannel2.close();
    }
}

首先运行TestRemoveServer,然后顺序运行之前编写的TestRepeatConsumeClient1和TestRepeatConsumeClient2,可以看到,这次运行TestRepeatConsumeClient2时,并没有出现重复消费的情况.因为在TestRemoveServer里已经对使用完的SelectionKey进行了remove处理.

从已就绪的键集中获得通道中的数据

获得通道中数据的服务端代码:

public class TestGetDataFromSelectorServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector.select();
            Set<SelectionKey> set = selector.selectedKeys();
            Iterator<SelectionKey> iterator = set.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    ServerSocket serverSocket = channel.socket();
                    Socket socket = serverSocket.accept();
                    InputStream inputStream = socket.getInputStream();
                    byte[] byteArray = new byte[1000];
                    int readLength = inputStream.read(byteArray);
                    while (readLength != -1){
                        String newString = new String(byteArray,0,readLength);
                        System.out.println(newString);
                        readLength = inputStream.read(byteArray);
                    }
                    inputStream.close();
                    socket.close();

                    iterator.remove(); // 删除
                }
            }
        }
        serverSocketChannel.close();
    }
}

获得通道中数据的客户端的代码:

public class TestGetDataFromSelectorClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("我来自客户端!".getBytes());
        socket.close();
    }
}

首先运行TestGetDataFromSelectorServer,然后再运行TestGetDataFromSelectorClient,可以看到TestGetDataFromSelectorServer的控制台上已经打印出了相应的客户端发送的信息。

public abstract int select(long timeout)方法的使用

public abstract int select(long timeout)方法的作用是选择一组键,其相应的通道已为I/O操作准备就绪.此方法执行处于阻塞模式的选择操作.仅在至少选择一个通道,调用此选择器的wakeup()方法,当前的线程已中断,或者给定的超时期满(以先到者为准)后,此方法才返回.此方法不提供实时保证,参数timeout代表如果为正,则在等待某个通道准备就绪时最多阻塞timeout毫秒.如果为零,则无限期地阻塞;必须为负数.返回值代表已更新其准备就绪操作集的键的数目,该数目可能为零.

SelectionKey类的使用

SelectionKey类表示SelectableChannel在选择器中的注册的标记.

在每次向选择器注册通道时,就会创建一个选择器(SelectionKey).通过调用某个键的cancel()方法,关闭其通道,或者通过关闭其选择器取消该键之前,通道一直保持有效。取消某个键不会立即从其选择器中移除它,而是将该键添加到选择器的已取消键集,以便在下一次进行select()方法操作时移除它.可通过调用某个键的isValid()方法来测试其有效性。

判断是否允许连接SelectableChannel对象

public final boolean isAcceptable()方法的作用是测试此键的通道是否已准备好接受新的套接字连接.调用此方法的形式为k.isAcceptable(),该调用与以下调用的作用完全相同:k.readyOps() & OP_ACCEPT != 0.如果此键的通道不支持套接字连接操作,则此方法始终返回false.返回值当且仅当readyOps() & OP_ACCEPT为非零值时才返回true。

isAcceptable()方法的示例代码:

public class TestIsAcceptableServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel1.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        boolean isRun = true;
        while (isRun == true){
            selector.select();
            Set<SelectionKey> selectionKeysSet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeysSet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                Socket socket = null;
                if (key.isAcceptable()){
                    socket = channel.socket().accept();
                    System.out.println("server isAcceptable()");
                }
                socket.close();
                iterator.remove();
            }
        }
        serverSocketChannel1.close();
    }
}

public final boolean isConnectable()方法的作用是测试此键的通道是否已完成其套接字连接操作.调用此方法的形式为k.isConnectable(),该调用与以下调用的作用完全相同:k.readyOps()&OP_CONNECT != 0.如果此键的通道不支持套接字连接操作,则此方法始终返回false.返回值当且仅当readyOps() & OP_CONNECT为非零值时才返回true。

isConnectable()方法的示例代码:

public class TestIsConnectableServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = socketChannel.register(selector,SelectionKey.OP_ACCEPT);
        boolean isRun = true;
        while(isRun == true){
            int keyCount = selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isConnectable()){
                    System.out.println("client isConnectable()");
                    // 需要在此处使用finishConnect()方法完成连接
                    // 因为socketChannel是非阻塞模式
                    while (!socketChannel.finishConnect()){
                        System.out.println("!socketChannel.finishConnect()");
                    }
                    SocketChannel channel = (SocketChannel)key.channel();
                    channel.close();
                }
                iterator.remove();
            }
        }
        socketChannel.close();
        System.out.println("");
    }
}

判断是否已准备好进行读取

public final boolean isReadable()方法的作用是测试此键的通道是否已准备好进行读取.调用此方法的形式为k.isReadable(),该调用与以下调用的作用完全相同:k.readyOps()&OP_READ != 0.如果此键的通道不支持读取操作,则此方法始终返回false。

服务端示例代码为:

public class TestIsReadableServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8088));
        serverSocketChannel1.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        SocketChannel socketChannel = null;
        boolean isRun = true;
        while (isRun == true){
            selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    System.out.println("server isAcceptable()");
                    socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    // 对socketChannel注册读的事件
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }

                if (key.isReadable()){
                    System.out.println("server isReadable()");
                    ByteBuffer buffer = ByteBuffer.allocate(1000);
                    int readLength = socketChannel.read(buffer);
                    while (readLength != -1){
                        String newString = new String(buffer.array(),0,readLength);
                        System.out.println(newString);
                        readLength = socketChannel.read(buffer);
                    }
                    socketChannel.close();
                }
                iterator.remove();
            }
        }
        serverSocketChannel1.close();
    }
}

客户端示例代码为:

public class TestIsReadableClient {

    public static void main(String[] args) throws InterruptedException{
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            SelectionKey selectionKey1 = socketChannel.register(selector,SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress("localhost",8088));
            int keyCount = selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isConnectable()){
                    // 需要在此处使用finishConnect()方法完成连接,因为socketChannel是非阻塞模式
                    while (!socketChannel.finishConnect()){
                        System.out.println("!socketChannel.finishConnect()-------");
                    }
                    System.out.println("client isConnectable()");
                    SocketChannel channel = (SocketChannel)key.channel();
                    byte[] writeData = "我来自客户端,你好,服务器!".getBytes();
                    ByteBuffer buffer = ByteBuffer.wrap(writeData);
                    channel.write(buffer);
                    channel.close();
                }
            }
            System.out.println("client end!");
        }catch (ClosedChannelException e){
            e.printStackTrace();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

可以看出,在TestIsReadableServer中先进行了isAcceptable()的判断,然后调用accept()方法获取,并在之后注册OP_READ类型的事件,为之后的读取做准备,然后判断isReadable(),如果可读取,则读取通道中的内容并输出。

首先运行TestIsReadableServer,然后运行TestIsReadableClient。可以看到TestIsReadableServer的控制台上打印出了以下内容:

server isAcceptable()
server isReadable()
我来自客户端,你好,服务器!

TestIsReadableClient的控制台上打印出了以下内容:

client isConnectable()
client end!

判断是否已准备好进行写入

public final boolean isWritable()方法的作用是测试此键的通道是否已准备好进行写入。调用此方法的形式为k.isWritable(),该调用与以下调用的作用完全相同:k.readyOps()& OP_WRITE != 0。如果此键的通道不支持写入操作,则此方法始终返回false。

isWritable()方法的服务端示例代码:

public class TestIsWritableServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel1.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        SocketChannel socketChannel = null;
        boolean isRun = true;
        while (isRun == true){
            selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    System.out.println("server isAcceptable()");
                    socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }

                if (key.isReadable()){
                    System.out.println("server isReadable()");
                    ByteBuffer buffer = ByteBuffer.allocate(1000);
                    int readLength = socketChannel.read(buffer);
                    while (readLength != -1){
                        String newString =  new String(buffer.array(),0,readLength);
                        System.out.println(newString);
                        readLength = socketChannel.read(buffer);
                    }
                    socketChannel.close();
                }
                iterator.remove();

            }
        }
        serverSocketChannel1.close();
    }
}

isWritable()方法的客户端示例代码:

public class TestIsWritableClient {

    public static void main(String[] args) throws IOException,InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = socketChannel.register(selector,SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8888));

        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isConnectable()){
                    System.out.println("client isConnectable()");
                    if (socketChannel.isConnectionPending()){
                        while (!socketChannel.finishConnect()){
                            System.out.println("!socketChannel.finishConnect()----");
                        }
                        socketChannel.register(selector,SelectionKey.OP_WRITE);
                    }
                }
                if (key.isWritable()){
                    System.out.println("client isWritable()");
                    byte[] writeData = "我来自客户端,你好,服务器!".getBytes();
                    ByteBuffer buffer = ByteBuffer.wrap(writeData);
                    socketChannel.write(buffer);
                    socketChannel.close();
                }
            }
        }
        System.out.println("client end!");
    }
}

可以看到在TestIsWritableClient中,先是判断是否可连接,若是可连接,则将类型为SelectionKey.OP_WRITE的键事件注册到选择器上,然后判断是否可写入,如果可写入的话,将准备好的数据写入构造好的ByteBuffer缓冲区,并将缓冲区写入通道。

首先运行TestIsWritableServer,然后运行TestIsWritableClient。可以在TestIsWritableServer的控制台上看到以下输出:

server isAcceptable()
server isReadable()
我来自客户端,你好,服务器!

可以在TestIsWritableClient的控制台上看到以下输出:

client isConnectable()
client isWritable()

返回SelectionKey关联的选择器

public abstract Selector selector()方法的作用是返回SelectionKey关联的选择器,即使已取消该键,此方法仍将继续返回选择器。

示例代码为:

public class TestSelectorServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel1.configureBlocking(false);
        Selector selector1 = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel1.register(selector1,SelectionKey.OP_ACCEPT);

        Selector selector2 = selectionKey1.selector();
        System.out.println(selector1  + " " + selector1.hashCode());
        System.out.println(selector2  + " " + selector2.hashCode());
        serverSocketChannel1.close();
    }
}

运行可看到,输出的两个对象的值相同,说明是同一个对象

在注册操作时传入attachment附件

SelectableChannel类中的public final SelectionKey register(Selector sel,int ops,Object att)方法的作用是向给定的选择器注册此通道,返回一个选择键。而SelectionKey类中的public final Object attachment()方法的作用是获取当前的附加对象。返回值代表当前已附加到此键的对象,如果没有附加对象,则返回null。

附件的服务端示例代码为:

public class TestAttachmentServer {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.bind(new InetSocketAddress("localhost",8888));
        serverSocketChannel1.configureBlocking(false);
        Selector selector =  Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        SocketChannel socketChannel = null;
        boolean isRun = true;
        while (isRun == true){
            selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isAcceptable()){
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    System.out.println("server isAcceptable()");
                    socketChannel = channel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                }

                if (key.isReadable()){
                    System.out.println("server isReadable()");
                    ByteBuffer buffer = ByteBuffer.allocate(1000);
                    int readLength = socketChannel.read(buffer);
                    while (readLength != -1){
                        String newString =  new String(buffer.array(),0,readLength);
                        System.out.println(newString);
                        readLength = socketChannel.read(buffer);
                    }
                    socketChannel.close();
                }
                iterator.remove();
            }
        }
        serverSocketChannel1.close();
    }
}

附件的客户端示例代码为:

public class TestAttachmentClient {

    public static void main(String[] args) throws IOException,InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = socketChannel.register(selector,SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8888));
        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isConnectable()){
                    System.out.println("client isConnectable()");
                    if (socketChannel.isConnectionPending()){
                        while (!socketChannel.finishConnect()){
                            System.out.println("!socketChannel.finishConnect()-----");
                        }
                        socketChannel.register(selector,SelectionKey.OP_WRITE,"我使用附件进行注册,我来自客户端,你好服务端!");
                    }
                }
                if (key.isWritable()){
                    System.out.println("client isWritable()");
                    ByteBuffer buffer = ByteBuffer.wrap(((String)key.attachment()).getBytes());
                    socketChannel.write(buffer);
                    socketChannel.close();
                }
            }
        }
        System.out.println("client end!");
    }
}

可以看到,在TestAttachmentClient里,最开始的代码和之前的一样,只是在将SocketChannel对象注册到选择器时,多了一个参数,往这个参数里传了字节数组,并在下面的代码中,使用key.attachment()方法获取出来,传入通道。

首先运行TestAttachmentServer,再运行TestAttachmentClient。可以看到TestAttachmentServer的控制台输出了以下内容:

server isAcceptable()
server isReadable()
我使用附件进行注册,我来自客户端,你好服务端!

而TestAttachmentClient的控制台输出了以下内容:

client isConnectable()
client isWritable()

向SelectionKey中设置附件

之前是在向选择器注册通道时传入附件,还可以向SelectionKey键中设置附件。

public final Object attach(Object ob)方法的作用是将给定的对象附加到此键。之后可通过attachment()方法获取已附加的对象。一次只能附加一个对象。调用此方法会导致丢弃所有以前的附加对象。通过附加null可丢弃当前的附加对象。返回值代表先前已附加的对象(如果有),否则返回null。

示例代码为:

public class TestSelectionKeyAttachmentClient {

    public static void main(String[] args) throws IOException,InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = socketChannel.register(selector,SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8888));
        boolean isRun = true;
        while (isRun == true){
            int keyCount = selector.select();
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                if (key.isConnectable()){
                    System.out.println("client isConnectable()");
                    if (socketChannel.isConnectionPending()){
                        while (!socketChannel.finishConnect()){
                            System.out.println("!socketChannel.finishedConnect()-------");
                        }
                        socketChannel.register(selector,SelectionKey.OP_WRITE);
                        // 追加附件数据
                        key.attach("我使用attach(Object)进行注册,我来自客户端,你好服务端!");
                    }
                }
                if (key.isWritable()){
                    System.out.println("client isWritable()");
                    ByteBuffer buffer = ByteBuffer.wrap(((String) key.attachment()).getBytes());
                    socketChannel.write(buffer);
                    socketChannel.close();
                }
            }
        }
        System.out.println("client end!");
    }
}

可以看到与之前的TestAttachmentClient不同的是这次是向SelectionKey中设置附件。

首先运行之前编写的TestAttachmentServer,再运行这次的TestSelectionKeyAttachmentClient。可以看到TestAttachmentServer的控制台输出了以下内容:

server isAcceptable()
server isReadable()
我使用attach(Object)进行注册,我来自客户端,你好服务端!

判断此键是否有效

public abstract boolean isValid()方法的作用是告知此键是否有效。键在创建时是有效的,并在被取消、其通道已关闭或者选择器已关闭之前保持有效。返回值当且仅当此键有效时才返回true。

示例代码:

public class TestIsValid {

    public static void main(String[] args) throws IOException,InterruptedException {
        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
        serverSocketChannel1.configureBlocking(false);
        Selector selector = Selector.open();
        SelectionKey selectionKey1 = serverSocketChannel1.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println(selectionKey1.isValid());
        selectionKey1.cancel();
        System.out.println(selectionKey1.isValid());
        serverSocketChannel1.close();
    }
}

取消操作

public abstract void cancel()方法的作用是请求取消此键的通道到其选择器的注册。一旦返回,该键就是无效的,并且将被添加到其选择器的已取消键集中。在进行下一次选择操作时,将从所有选择器的键集中移除该键。如果已取消了此键,则调用此方法无效。一旦取消某个键,SelectionKey.isValid()方法返回false。可在任意时间调用cancel()方法。此方法与选择器的已取消键集保持同步。因此,如果通过涉及同一选择器的取消或选择操作并发调用它,可能会暂时受阻塞。