Skip to content

网络NIO

1. ServerSocketChannel

ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据, 也就是本身不实现读和写功能。ServerSocketChannel是一个基于通道的socket监听器,和java.net.ServerSocket作用一样.

1.1 阻塞模式

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept()会在没有连接建立时让线程暂停
  • SocketChannel.read()会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用cpu,但线程相当于处于闲置

单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持。但多线程下,有新的问题,体现在以下方面:

  • 32位jvm一个线程320k,64位jvm一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
  • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
java
public class Test01 {
    
    static Logger log = LoggerFactory.getLogger(Test01.class);

    public static void main(String[] args) throws IOException {
        // 0. ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 1. 创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8090));
        // 3. 连接集合
        List<SocketChannel> channels = new ArrayList<>();
        // 放在while里面, 客户端可以不断连接过来
        while (true) {
            // 4. accept 建立与客户端连接, SocketChannel用来与客户端之间通信
            log.debug("connecting...");
            SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
            log.debug("connected... {}", sc);
            channels.add(sc);
            for (SocketChannel channel : channels) {
                // 5. 接收客户端发送的数据
                log.debug("before read... {}", channel);
                channel.read(buffer); // 阻塞方法,线程停止运行
                buffer.flip();
                buffer.clear();
                log.debug("after read...{}", channel);
            }
        }
    }
}

1.2 非阻塞模式

非阻塞模式下,相关方法都会不会让线程暂停

  • 在ServerSocketChannel.accept()在没有连接建立时,会返回null,继续运行
  • SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept
  • 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
  • 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu
  • 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
java
public class Test02 {

    static Logger log = LoggerFactory.getLogger(Test02.class);

    public static void main(String[] args) throws IOException {
        // 使用nio来理解非阻塞模式, 单线程
        // 0. ByteBuffer
        ByteBuffer buffer = ByteBuffer.allocate(16);
        // 1. 创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 非阻塞模式
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8090));
        // 3. 连接集合
        List<SocketChannel> channels = new ArrayList<>();
        while (true) {
            // 4. accept建立与客户端连接, SocketChannel用来与客户端之间通信
            SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
            if (sc != null) {
                log.debug("connected... {}", sc);
                // 设置非阻塞模式,可以使得read()不再阻塞的了
                sc.configureBlocking(false);
                channels.add(sc);
            }
            for (SocketChannel channel : channels) {
                // 5. 接收客户端发送的数据
                int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
                if (read > 0) {
                    buffer.flip();
                    log.debug("read...{}", new String(buffer.array(), 0, read));
                    buffer.clear();
                    log.debug("after read...{}", channel);
                }
            }
        }
    }
}

cmd窗口输入: telent 127.0.0.1 8090, 运行结果:
Alt text 客户端接收到服务端返回消息 Alt text 非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu,另外数据复制过程中,线程实际还是阻塞的。

1.3 使用selector

java
public class Test03 {

    static Logger log = LoggerFactory.getLogger(Test03.class);

    public static void main(String[] args) throws IOException {
        log.debug("starting...");
        // 1. 创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 非阻塞模式
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8090));
        Selector selector = Selector.open();
        /* 一共有4个事件:
        * accept(接收客户端连接触发),
        * connect(客户端用的:和服务端建立连接触发),
        * read(可读事件),
        * write(可写事件)
        */
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 轮询选择器
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 自己删除key,防止重复处理
                iterator.remove();
                if (key.isAcceptable()) {
                    log.debug("connected... {}", key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    // 设置非阻塞模式,可以使得read()不再阻塞的了
                    sc.configureBlocking(false);
                    SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ, null);
                    log.debug("scKey: {}", selectionKey);
                } else if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // ByteBuffer
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    // 5. 接收客户端发送的数据
                    int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
                    if (read > 0) {
                        log.debug("read...{}", new String(buffer.array(), 0, read));
                        buffer.flip();
                        buffer.clear();
                        log.debug("after read...{}", channel);
                    }else{// 断开连接返回值为-1
                        log.debug("read...{}", read);
                        // 客户端断开连接处理
                        key.cancel();
                    }
                }
            }
        }
    }
}

为何要执行iter.remove()

因为select在事件发生后,就会将相关的key放入selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码删除。例如

  • 第一次触发了ssckey上的accept事件,没有移除ssckey
  • 第二次触发了sckey上的read事件,但这时selectedKeys中还有上次的ssckey,在处理时因为没有真正的 serverSocket连上了,就会导致空指针异常

3. SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道,主要用途用来处理网络I/O的通道。SocketChannel可以被多路复用。

3.1 SocketChannel特点

  1. 对于已经存在的socket不能创建SocketChannel
  2. SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址
  3. 未进行连接的SocketChannle执行I/O操作时,会抛出NotYetConnectedException
  4. SocketChannel支持两种I/O模式:阻塞式和非阻塞式
  5. SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException
  6. SocketChannel 支持设定参数:
    • SO_SNDBUF 套接字发送缓冲区大小
    • SO_RCVBUF 套接字接收缓冲区大小
    • SO_KEEPALIVE 保活连接
    • O_REUSEADDR 复用地址
    • SO_LINGER 有数据传输时延缓关闭Channel(只有在非阻塞模式下有用)
    • TCP_NODELAY 禁用Nagle算法

3.2 SocketChannel的使用

java
public static void main(String[] args) throws IOException {
    // 创建 SocketChannel
    // 方式一
    // SocketChannel socketChanne2 = SocketChannel.open();
    // socketChanne2.connect(new InetSocketAddress("www.baidu.com", 80));
    // 方式二
    SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.sina.com", 80));
    boolean open = socketChannel.isOpen();// 测试 SocketChannel 是否为 open 状态
    System.out.println("=======open============" + open);
    boolean connected = socketChannel.isConnected();//测试 SocketChannel 是否已经被连接
    System.out.println("=========connected==========" + connected);
    boolean connectionPending = socketChannel.isConnectionPending();//测试 SocketChannel 是否正在进行连接
    System.out.println("========connectionPending===========" + connectionPending);
    boolean finishConnect = socketChannel.finishConnect();//校验正在进行套接字连接的 SocketChannel是否已经完成连接
    System.out.println("=========finishConnect==========" + finishConnect);
    socketChannel.configureBlocking(false);
    ByteBuffer byteBuffer = ByteBuffer.allocate(32);
    socketChannel.read(byteBuffer);
    Boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
    Integer soRcvbuf = socketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
    // 反转读写模式
    byteBuffer.flip();
    while(byteBuffer.hasRemaining()) {
        System.out.println(byteBuffer.get());
    }
    socketChannel.close();
    System.out.println("read over");
    System.out.println(keepAlive);
    System.out.println(soRcvbuf);
}

4. DatagramChannel

使用DatagramChannel来处理UDP的数据传输。和TCP不同,UDP不是面向连接的协议。使用UDP时,只要知道服务器的IP和端口就可以直接向对方发送数据。

java
@Test
public void testClient() throws IOException {
   // 打开 DatagramChannel
   DatagramChannel receiveChannel = DatagramChannel.open();
   //设置为非阻塞模式
   receiveChannel.configureBlocking(false);
   receiveChannel.bind(new InetSocketAddress(10086));
   ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
   while (true) {
      receiveBuffer.clear();
      // 通过 receive()接收 UDP 包
      SocketAddress receiveAddr = receiveChannel.receive(receiveBuffer);
      receiveBuffer.flip();
      System.out.print(receiveAddr.toString() + " ");
      System.out.println(Charset.forName("UTF-8").decode(receiveBuffer));
   }
}

@Test
// 发送数据
public void testServer() throws IOException, InterruptedException {
   // 打开 DatagramChannel
   DatagramChannel server = DatagramChannel.open();
   //设置为非阻塞模式
   server.configureBlocking(false);
   while (true){
      // 通过send()发送UDP包
      server.send(ByteBuffer.wrap("server msg:  send".getBytes()), new InetSocketAddress("127.0.0.1",10086));
      System.out.println("发包端发包");
      Thread.sleep(1000);
   }
}

5. 处理read事件

💡 为何要 iter.remove()

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
java
public class ChannelDemo6 {
    public static void main(String[] args) {
        try (ServerSocketChannel channel = ServerSocketChannel.open()) {
            channel.bind(new InetSocketAddress(8080));
            System.out.println(channel);
            Selector selector = Selector.open();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                int count = selector.select();
                log.debug("select count: {}", count);
                // 获取所有事件
                Set<SelectionKey> keys = selector.selectedKeys();

                // 遍历所有事件,逐一处理
                Iterator<SelectionKey> iter = keys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 判断事件类型
                    if (key.isAcceptable()) {
                        ServerSocketChannel c = (ServerSocketChannel) key.channel();
                        // 必须处理
                        SocketChannel sc = c.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("连接已建立: {}", sc);
                    } else if (key.isReadable()) {
                        SocketChannel sc = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(128);
                        int read = sc.read(buffer);
                        if(read == -1) {
                            key.cancel();
                            sc.close();
                        } else {
                            buffer.flip();
                            debug(buffer);
                        }
                    }
                    // 处理完毕,必须将事件移除
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

💡 cancel 的作用

cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件

5.1 处理消息的边界

如果我们读消息的时候,设置的ByteBuffer过小,则可能出现乱码的情况:
Alt text 处理方法有3种:

  • 一种思路是固定消息长度(比如和客户端商定最大长度),数据包大小一样,服务器按预定长度读取,缺点是浪费带宽(用的比较少)
  • 另一种思路是按分隔符拆分,缺点是效率低(由于挨个字节判断分隔符,不能直接一次读取)
  • TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量:
    • Http1.1就是采用TLV格式(http的header里面有content-length,content-type)
    • Http2.0就是采用LTV格式(http的header里面有content-length,content-type)
java
public static void main(String[] args) throws IOException {
        log.debug("starting...");
        // 1. 创建了服务器
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 非阻塞模式
        // 2. 绑定监听端口
        ssc.bind(new InetSocketAddress(8090));
        Selector selector = Selector.open();
        /* 一共有4个事件:
        * accept(接收客户端连接触发),
        * connect(客户端用的:和服务端建立连接触发),
        * read(可读事件),
        * write(可写事件)
        */
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 轮询选择器
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 自己删除key,防止重复处理
                iterator.remove();
                if (key.isAcceptable()) {
                    log.debug("connected... {}", key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    // 设置非阻塞模式,可以使得read()不再阻塞的了
                    sc.configureBlocking(false);
                    // ByteBuffer
                    ByteBuffer buffer = ByteBuffer.allocate(16);
                    // 注册读事件, 第三个参数为附件,就是注册的时候,可以在channel传递一个对象,方便后续使用, 注册后附件就会和channel,selectionKey关联上了。
                    SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ, buffer);
                    log.debug("scKey: {}", selectionKey);
                } else if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 5. 接收客户端发送的数据
                    /**
                     * 从chanel中读取数据,如果没读完,可以第二次读取剩余的,这就涉及到两次读的数据合并问题,
                     * 因为我们的bytebuffer是在if代码块中,作用域很小,第二次读完数据无法合并第一次数据,
                     * 如果直接将bytebuffer往外提,提高作用域范围,又会导致读写混乱的问题,
                     * 因为有服务端会被多个客户端连接,所以需要区分是哪个客户端的数据。这时就需要用到附件了。
                     *  */ 
                    // 获取附件
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
                    if (read > 0) {
                        // 需要扩容
                        if (buffer.position() == buffer.limit()) {
                            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                            buffer.flip();
                            newBuffer.put(buffer); // 0123456789abcdef3333\n
                            key.attach(newBuffer);
                        }else{
                            log.debug("read...{}", new String(buffer.array(), 0, read));
                            buffer.flip();
                            buffer.clear();
                            log.debug("after read...{}", channel);
                        }
                    }else{// 断开连接返回值为-1
                        log.debug("read...{}", read);
                        // 客户端断开连接处理
                        key.cancel();
                    }
                }
            }
        }
    }
}

ByteBuffer大小分配

  1. 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
  2. ByteBuffer不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的 ByteBuffer
  • 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4k buffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
  • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

6. 处理write事件

6.1 一次无法写完例子

  1. 非阻塞模式下,无法保证把buffer中所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入字节数)
  2. 用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将channel注册到selector上
    • selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册
    • 如果不取消,会每次可写均会触发write事件
java
public static void main(String[] args) throws InterruptedException, IOException {
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.configureBlocking(false);
    Selector selector = Selector.open();
    // ssc只能注册accept事件
    ssc.register(selector, SelectionKey.OP_ACCEPT);
    ssc.bind(new InetSocketAddress(8080));
    while (true) {
        // 没有事件发生,线程阻塞,有事件,线程才会恢复运行
        selector.select();
        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            if (key.isAcceptable()) {
                // channel其实就是ssc
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                channel.configureBlocking(false);
                SocketChannel socketChannel = channel.accept();
                socketChannel.configureBlocking(false);
                // socketChannel只能注册read事件
                SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ);
                // 写数据到客户端
                // 造大数据包
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < 10000000; i++) {
                    sb.append("a");
                }
                ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                // 底层使用操作系统的缓存区写,有可能写不完
                int write = socketChannel.write(buffer);
                // 3. write 表示实际写了多少字节
                System.out.println("实际写入字节:" + write);
                // 4. 如果buffer里没写完,还有剩余未读字节,需要关注写事件
                if (buffer.hasRemaining()) {
                    // 在原有关注事件的基础上,关注写事件,不覆盖以前的
                    scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
                    // 或者使用位运算 scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
                    // 把 buffer 作为附件加入 sckey
                    scKey.attach(buffer);
                }
            }else if (key.isWritable()) {
                //从buffer里面拿出buffer,继续写
                ByteBuffer buffer = (ByteBuffer) key.attachment();
                SocketChannel sc = (SocketChannel) key.channel();
                int write = sc.write(buffer);
                System.out.println("实际写入字节:" + write);
                // 写完了,就不再关注写事件,并释放buffer的内存(只需要没人再用,gc会回收)
                if (!buffer.hasRemaining()) {
                    key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    key.attach(null);
                }
            }
        }
    }
}