网络NIO
1. ServerSocketChannel
ServerSocketChannel负责监听传入的连接和创建新的SocketChannel对象,它本身从不传输数据, 也就是本身不实现读和写功能。ServerSocketChannel是一个基于通道的socket监听器,和java.net.ServerSocket作用一样.
1.1 阻塞模式
阻塞模式下,相关方法都会导致线程暂停
- ServerSocketChannel.accept()会在没有连接建立时让线程暂停
- SocketChannel.read()会在没有数据可读时让线程暂停
- 阻塞的表现其实就是线程暂停了,暂停期间不会占用cpu,但线程相当于处于闲置
单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持。但多线程下,有新的问题,体现在以下方面:
- 32位jvm一个线程320k,64位jvm一个线程1024k,如果连接数过多,必然导致OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
- 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
public class Test01 {
static Logger log = LoggerFactory.getLogger(Test01.class);
public static void main(String[] args) throws IOException {
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8090));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
// 放在while里面, 客户端可以不断连接过来
while (true) {
// 4. accept 建立与客户端连接, SocketChannel用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,线程停止运行
log.debug("connected... {}", sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
log.debug("before read... {}", channel);
channel.read(buffer); // 阻塞方法,线程停止运行
buffer.flip();
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
}
1.2 非阻塞模式
非阻塞模式下,相关方法都会不会让线程暂停
- 在ServerSocketChannel.accept()在没有连接建立时,会返回null,继续运行
- SocketChannel.read在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept
- 写数据时,线程只是等待数据写入Channel即可,无需等Channel通过网络把数据发送出去
- 但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu
- 数据复制过程中,线程实际还是阻塞的(AIO改进的地方)
public class Test02 {
static Logger log = LoggerFactory.getLogger(Test02.class);
public static void main(String[] args) throws IOException {
// 使用nio来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8090));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept建立与客户端连接, SocketChannel用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
if (sc != null) {
log.debug("connected... {}", sc);
// 设置非阻塞模式,可以使得read()不再阻塞的了
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
if (read > 0) {
buffer.flip();
log.debug("read...{}", new String(buffer.array(), 0, read));
buffer.clear();
log.debug("after read...{}", channel);
}
}
}
}
}
cmd窗口输入: telent 127.0.0.1 8090
, 运行结果: 客户端接收到服务端返回消息
非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了cpu,另外数据复制过程中,线程实际还是阻塞的。
1.3 使用selector
public class Test03 {
static Logger log = LoggerFactory.getLogger(Test03.class);
public static void main(String[] args) throws IOException {
log.debug("starting...");
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8090));
Selector selector = Selector.open();
/* 一共有4个事件:
* accept(接收客户端连接触发),
* connect(客户端用的:和服务端建立连接触发),
* read(可读事件),
* write(可写事件)
*/
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 轮询选择器
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 自己删除key,防止重复处理
iterator.remove();
if (key.isAcceptable()) {
log.debug("connected... {}", key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
// 设置非阻塞模式,可以使得read()不再阻塞的了
sc.configureBlocking(false);
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ, null);
log.debug("scKey: {}", selectionKey);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
// ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 5. 接收客户端发送的数据
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
if (read > 0) {
log.debug("read...{}", new String(buffer.array(), 0, read));
buffer.flip();
buffer.clear();
log.debug("after read...{}", channel);
}else{// 断开连接返回值为-1
log.debug("read...{}", read);
// 客户端断开连接处理
key.cancel();
}
}
}
}
}
}
为何要执行iter.remove()
因为select在事件发生后,就会将相关的key放入selectedKeys集合,但不会在处理完后从selectedKeys集合中移除,需要我们自己编码删除。例如
- 第一次触发了ssckey上的accept事件,没有移除ssckey
- 第二次触发了sckey上的read事件,但这时selectedKeys中还有上次的ssckey,在处理时因为没有真正的 serverSocket连上了,就会导致空指针异常
3. SocketChannel
Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道,主要用途用来处理网络I/O的通道。SocketChannel可以被多路复用。
3.1 SocketChannel特点
- 对于已经存在的socket不能创建SocketChannel
- SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址
- 未进行连接的SocketChannle执行I/O操作时,会抛出
NotYetConnectedException
- SocketChannel支持两种I/O模式:阻塞式和非阻塞式
- SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用
shutdownInput
,则读阻塞的线程将返回-1
表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite
,则写阻塞的线程将抛出AsynchronousCloseException - SocketChannel 支持设定参数:
- SO_SNDBUF 套接字发送缓冲区大小
- SO_RCVBUF 套接字接收缓冲区大小
- SO_KEEPALIVE 保活连接
- O_REUSEADDR 复用地址
- SO_LINGER 有数据传输时延缓关闭Channel(只有在非阻塞模式下有用)
- TCP_NODELAY 禁用Nagle算法
3.2 SocketChannel的使用
public static void main(String[] args) throws IOException {
// 创建 SocketChannel
// 方式一
// SocketChannel socketChanne2 = SocketChannel.open();
// socketChanne2.connect(new InetSocketAddress("www.baidu.com", 80));
// 方式二
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.sina.com", 80));
boolean open = socketChannel.isOpen();// 测试 SocketChannel 是否为 open 状态
System.out.println("=======open============" + open);
boolean connected = socketChannel.isConnected();//测试 SocketChannel 是否已经被连接
System.out.println("=========connected==========" + connected);
boolean connectionPending = socketChannel.isConnectionPending();//测试 SocketChannel 是否正在进行连接
System.out.println("========connectionPending===========" + connectionPending);
boolean finishConnect = socketChannel.finishConnect();//校验正在进行套接字连接的 SocketChannel是否已经完成连接
System.out.println("=========finishConnect==========" + finishConnect);
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
socketChannel.read(byteBuffer);
Boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
Integer soRcvbuf = socketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
// 反转读写模式
byteBuffer.flip();
while(byteBuffer.hasRemaining()) {
System.out.println(byteBuffer.get());
}
socketChannel.close();
System.out.println("read over");
System.out.println(keepAlive);
System.out.println(soRcvbuf);
}
4. DatagramChannel
使用DatagramChannel来处理UDP的数据传输。和TCP不同,UDP不是面向连接的协议。使用UDP时,只要知道服务器的IP和端口就可以直接向对方发送数据。
@Test
public void testClient() throws IOException {
// 打开 DatagramChannel
DatagramChannel receiveChannel = DatagramChannel.open();
//设置为非阻塞模式
receiveChannel.configureBlocking(false);
receiveChannel.bind(new InetSocketAddress(10086));
ByteBuffer receiveBuffer = ByteBuffer.allocate(512);
while (true) {
receiveBuffer.clear();
// 通过 receive()接收 UDP 包
SocketAddress receiveAddr = receiveChannel.receive(receiveBuffer);
receiveBuffer.flip();
System.out.print(receiveAddr.toString() + " ");
System.out.println(Charset.forName("UTF-8").decode(receiveBuffer));
}
}
@Test
// 发送数据
public void testServer() throws IOException, InterruptedException {
// 打开 DatagramChannel
DatagramChannel server = DatagramChannel.open();
//设置为非阻塞模式
server.configureBlocking(false);
while (true){
// 通过send()发送UDP包
server.send(ByteBuffer.wrap("server msg: send".getBytes()), new InetSocketAddress("127.0.0.1",10086));
System.out.println("发包端发包");
Thread.sleep(1000);
}
}
5. 处理read事件
💡 为何要 iter.remove()
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如
- 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
- 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
public class ChannelDemo6 {
public static void main(String[] args) {
try (ServerSocketChannel channel = ServerSocketChannel.open()) {
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
log.debug("select count: {}", count);
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
log.debug("连接已建立: {}", sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if(read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
debug(buffer);
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
💡 cancel 的作用
cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
5.1 处理消息的边界
如果我们读消息的时候,设置的ByteBuffer过小,则可能出现乱码的情况: 处理方法有3种:
- 一种思路是固定消息长度(比如和客户端商定最大长度),数据包大小一样,服务器按预定长度读取,缺点是浪费带宽(用的比较少)
- 另一种思路是按分隔符拆分,缺点是效率低(由于挨个字节判断分隔符,不能直接一次读取)
- TLV格式,即Type类型、Length长度、Value数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的buffer,缺点是buffer需要提前分配,如果内容过大,则影响server吞吐量:
- Http1.1就是采用TLV格式(http的header里面有content-length,content-type)
- Http2.0就是采用LTV格式(http的header里面有content-length,content-type)
public static void main(String[] args) throws IOException {
log.debug("starting...");
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8090));
Selector selector = Selector.open();
/* 一共有4个事件:
* accept(接收客户端连接触发),
* connect(客户端用的:和服务端建立连接触发),
* read(可读事件),
* write(可写事件)
*/
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 轮询选择器
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 自己删除key,防止重复处理
iterator.remove();
if (key.isAcceptable()) {
log.debug("connected... {}", key);
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
// 设置非阻塞模式,可以使得read()不再阻塞的了
sc.configureBlocking(false);
// ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 注册读事件, 第三个参数为附件,就是注册的时候,可以在channel传递一个对象,方便后续使用, 注册后附件就会和channel,selectionKey关联上了。
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ, buffer);
log.debug("scKey: {}", selectionKey);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
// 5. 接收客户端发送的数据
/**
* 从chanel中读取数据,如果没读完,可以第二次读取剩余的,这就涉及到两次读的数据合并问题,
* 因为我们的bytebuffer是在if代码块中,作用域很小,第二次读完数据无法合并第一次数据,
* 如果直接将bytebuffer往外提,提高作用域范围,又会导致读写混乱的问题,
* 因为有服务端会被多个客户端连接,所以需要区分是哪个客户端的数据。这时就需要用到附件了。
* */
// 获取附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,返回0
if (read > 0) {
// 需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}else{
log.debug("read...{}", new String(buffer.array(), 0, read));
buffer.flip();
buffer.clear();
log.debug("after read...{}", channel);
}
}else{// 断开连接返回值为-1
log.debug("read...{}", read);
// 客户端断开连接处理
key.cancel();
}
}
}
}
}
}
ByteBuffer大小分配
- 每个channel都需要记录可能被切分的消息,因为ByteBuffer不能被多个channel共同使用,因此需要为每个channel维护一个独立的ByteBuffer
- ByteBuffer不能太大,比如一个ByteBuffer 1Mb的话,要支持百万连接就要1Tb内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的buffer,例如4k,如果发现数据不够,再分配8k的buffer,将4k buffer内容拷贝至8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
6. 处理write事件
6.1 一次无法写完例子
- 非阻塞模式下,无法保证把buffer中所有数据都写入channel,因此需要追踪write方法的返回值(代表实际写入字节数)
- 用selector监听所有channel的可写事件,每个channel都需要一个key来跟踪buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将channel注册到selector上
- selector检查channel上的可写事件,如果所有的数据写完了,就取消channel的注册
- 如果不取消,会每次可写均会触发write事件
public static void main(String[] args) throws InterruptedException, IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
// ssc只能注册accept事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 没有事件发生,线程阻塞,有事件,线程才会恢复运行
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// channel其实就是ssc
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
channel.configureBlocking(false);
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
// socketChannel只能注册read事件
SelectionKey scKey = socketChannel.register(selector, SelectionKey.OP_READ);
// 写数据到客户端
// 造大数据包
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 10000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 底层使用操作系统的缓存区写,有可能写不完
int write = socketChannel.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果buffer里没写完,还有剩余未读字节,需要关注写事件
if (buffer.hasRemaining()) {
// 在原有关注事件的基础上,关注写事件,不覆盖以前的
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 或者使用位运算 scKey.interestOps(scKey.interestOps() | SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
scKey.attach(buffer);
}
}else if (key.isWritable()) {
//从buffer里面拿出buffer,继续写
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
// 写完了,就不再关注写事件,并释放buffer的内存(只需要没人再用,gc会回收)
if (!buffer.hasRemaining()) {
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}