NIO基础
阻塞模式
打开服务器套接字通道
//使用nio 来理解阻塞模式
ServerSocketChannel ssc = ServerSocketChannel.open();
绑定服务器监听端口
ssc.bind(new InetSocketAddress(8080));
建立与客户端之间的连接
SocketChannel用来与客户端之间通信
//建立与客户端的连接
SocketChannel socketChannel = ssc.accept();
接收数据调用read方法
需要提供一个ByteBuffer
socketChannel.read(buffer);
Server
public class Server {
//全局ByteBuffer
public static ByteBuffer buffer = ByteBuffer.allocate(16);
public static void main(String[] args) throws IOException {
//使用nio 来理解阻塞模式
ServerSocketChannel ssc = ServerSocketChannel.open();
//绑定服务器监听端口
ssc.bind(new InetSocketAddress(8080));
//建立连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
System.out.println("connecting...");
//建立与客户端的连接,阻塞方法
SocketChannel socketChannel = ssc.accept();
//添加连接
channels.add(socketChannel);
//接收客户端发送的数据
for (SocketChannel channel : channels) {
System.out.println("reading...");
//接收数据调用read方法
socketChannel.read(buffer);
System.out.println(Charset.defaultCharset().decode(buffer));
//切换回写模式,以便于下次读取
buffer.flip();
buffer.clear();
}
}
}
}
Client
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
int i = 0;
//建立连接
SocketChannel socketChannel = null;
while (true) {
try {
if (socketChannel == null || !socketChannel.isConnected()) {
System.out.println("connecting...");
socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
}
//发送消息
String message = i+++"Hello Server!";
ByteBuffer buffer = Charset.defaultCharset().encode(message);
System.out.println("sending...");
socketChannel.write(buffer);
//保持建立通讯,每隔4秒发送一次消息
Thread.sleep(4000);
} catch (IOException e) {
System.out.println("connection failed, retrying...");
socketChannel = null;
}
}
}
}
此时处于阻塞模式下,当Sever执行accept时,则被阻塞,无法执行后续的read操作,因此Client发送的消息一直无法得到处理
非阻塞模式
//切换为非阻塞模式
ssc.configureBlocking(false);
Server
public class Server {
private static ByteBuffer buffer = ByteBuffer.allocate(100);
public static void main(String[] args) throws IOException {
//使用nio 来理解阻塞模式
ServerSocketChannel ssc = ServerSocketChannel.open();
//切换为非阻塞模式
ssc.configureBlocking(false);
//绑定服务器监听端口
ssc.bind(new InetSocketAddress(8080));
//建立连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true){
//建立与客户端的连接,阻塞方法
SocketChannel socketChannel = ssc.accept();
if (socketChannel != null){
System.out.println("connect success!");
//添加连接
channels.add(socketChannel);
}
for (SocketChannel channel : channels) {
//接收客户端发送的数据
System.out.println("reading...");
//接收数据调用read方法,非阻塞
int read = channel.read(buffer);//如果没有读取到数据则返回0
if (read > 0){
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
//切换回写模式,以便于下次读取
buffer.clear();
}
}
}
}
}
Client
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
int i = 0;
//建立连接
SocketChannel socketChannel = null;
while (true) {
try {
if (socketChannel == null || !socketChannel.isConnected()) {
System.out.println("connecting...");
socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
}
//发送消息
String message = "Hello Server!"+i++;
ByteBuffer buffer = Charset.defaultCharset().encode(message);
System.out.println("sending...");
socketChannel.write(buffer);
//保持建立通讯,每隔4秒发送一次消息
Thread.sleep(4000);
} catch (IOException e) {
System.out.println("connection failed, retrying...");
socketChannel = null;
}
}
}
}
非阻塞模式下,服务端不会因为accept方法而阻塞,而是会继续往下运行处理工作
弊端:但是线程过于繁忙,在无连接请求或是无数据可读时也会不断循环
- accept事件 会在有连接请求时触发
- connect事件 客户端连接建立后触发的事件
- read事件 可读事件
- write事件 可写事件
Selector模式下的非阻塞模式
处理单个事件
- 将ServerSocketChannel注册到Selector上
- 指明key所关注的事件
- 通过select方法等待事件发生
- 通过selector.selectedKeys方法获取到事件集合
- 用迭代器遍历,通过key.channel方法获取到触发事件的channel
//创建Selector,可以管理多个channel
Selector selector = Selector.open();
//建立Selector和channel的联系,0表示不关注任何事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//指明key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
//select方法,没有事件就阻塞,有事件发生则线程才会恢复
selector.select();
//处理事件,先获取到事件集合,内部包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//获取到触发事件的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
System.out.println(sc);
}
}
存在未处理事件时,selector是不会阻塞的,显然我们需要去处理这些事件
可以通过key.cancel();
方法取消本次事件处理
- 因此事件发生后要么处理,要么需要取消,不能不处理
处理多个事件
两个集合:
先称注册在selector中的所有key的集合叫集合1
Selector内部存在着一个selectionKey集合,当注册channel时,就会向集合1中放一个key
当select方法发现了新的事件后,就会把该事件的发生channel的key添加到一个新集合selectedKeys中
- 注意:但是并不会主动去删除新集合中的key,只会标记为已经处理过
因此我们必须移除掉已经处理过的key,否则下次再次处理时则容易发生错误
iterator.remove();
//建立Selector和channel的联系,0表示不关注任何事件,为channel注册key
SelectionKey sscKey = ssc.register(selector, 0, null);
//指明key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {
//select方法,没有事件就阻塞,有事件发生则线程才会恢复
selector.select();
//处理事件,先获取到事件集合,内部包含了所有发生的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//要将key从集合中删除
iterator.remove();
//区分事件类型
if (key.isAcceptable()){
//获取到触发事件的channel
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
//使得channel工作在非阻塞模式下
sc.configureBlocking(false);
//注册key
SelectionKey scKey = sc.register(selector, 0, null);
//关注读事件
scKey.interestOps(SelectionKey.OP_READ);
}else if (key.isReadable()){
SocketChannel sc = (SocketChannel)key.channel();
sc.read(buffer);
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
buffer.clear();
}
}
}
我们可以通过key.isXXX方法来判断触发的事件类型
处理客户端断开
try {
SocketChannel sc = (SocketChannel)key.channel();
int read = sc.read(buffer);
if (read == -1){
//正常断开也取消此key
key.cancel();
}
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
buffer.clear();
}catch (IOException e){
e.printStackTrace();
//注销掉因为断开连接异常而触发read事件的key
key.cancel();
}
cancel()
方法 将此channel所对应的key从selector集合中注销如果是正常断开,则读的时候会读取到-1,则取消key
如果时异常断开,则在catch语句块中取消key
处理消息边界
方法一:客户端与服务端约定buffer长度
- 缺点:浪费空间
方法二:携带消息长度信息
- 比较常用
Buffer大小分配
每个channel都需要记录可能被切分的消息,因为ByteBuffer是线程不安全的,因此需要为每个channel维护一个独立的ByteBuffer
- ByteBuffer不能太大,比如一个ByteBuffer要1Mb,百万连接就要1Tb内存,因此需要设计大小可变的ByteBuffer
- 一种思路是首先分配一个较小的buffer,如4k,如果发现不够用,再分配8k的buffer,将4k的buffer内容拷贝至8k的buffer,优点是消息容易连续处理,缺点是数据拷贝消耗性能
- 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能消耗
注册时可以给selectionKey携带附件buffer
Selectionkey scKey = sc.register(selector,0,buffer);
//获取携带的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
关注两个事件
无法一次性写到buffer中时,我们应该让selectionkey去做别的事情
//第一次写发现buffer还有数据无法被写入
if(buffer.hasRemaining()){
sckey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
//将未写完的数据附带给selectionkey
sckey.attach(buffer);
}
//如果可写
if(key.isWritable()){
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
//写完后清理附件
if(!buffer.hasRemaining()){
key.attach(null);
//不再关注可写事件
key.interestOps(key.interestOps()-Selection.OP_WRITE);
}
}
阻塞VS非阻塞
阻塞模式下,相关方法都会导致线程暂停
- ServerSocketChannel.accept() 会在没有连接建立的时候让线程暂停
- SocketChannel.read() 会在没有数据可读的时候让线程暂停
- 阻塞的表现就是线程暂停了,暂停期间不会占用cpu,但是线程处于闲置状态
- 单线程下,阻塞方法之间相互影响,不能工作,需要多线程支持
- 多线程下,jvm线程占用内存,如果连接数量过多,会导致OOM,会因为线程过多频繁切换上下文导致性能下降
- 可以采用线程池,但不适合长连接
非阻塞模式下,相关方法不会让线程暂停
- 在ServerSocketChannel.accept() 在没有建立连接时,会返回null,继续运行
- SocketChannel.read() 在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept()
- 写数据时,线程只是等待写入Channel即可,无需等待Channel通过网络把数据发送出去
- 非阻塞模式下,没有连接建立,和可读数据,线程仍然不断运行,浪费了cpu资源
多路复用Selector
适用于网络IO
即单个线程可以同时监听多个channel的事件
单线程可以配合Selector完成对多个Channel可读写事件的监控,着称之为多路复用
select何时不阻塞?
- 事件发生的时候
- 客户端发起连接请求,触发accept事件
- 客户端发送数据,客户端正常,异常关闭,都会触发read事件,另外如果发送的数据大于缓冲区,还会触发多次读取事件
- channel可写,会触发write事件
- 在linux下nio bug发生时
- 调用selector.wakeup()
- 调用selector.close()
- selector所在线程interrupt
多线程配合Selector优化
对象:Boss,worker
Boss只关注连接,不负责数据读写
- thread + selector
worker负责数据读写
- thread + selector
worker数不应该设置过多,要考虑cpu核心数合理分配
worker配合selector管理多个channel
场景问题:selector的select方法执行时,会导致register关联selector时阻塞
- 通过消息队列来解决线程之间交流问题
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
//向队列中添加了任务,但是任务并不立刻执行
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
服务端代码
package com.os467;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Demo1 {
private static int index = 0;
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel scc = ServerSocketChannel.open();
scc.configureBlocking(false);
Selector boss = Selector.open();
scc.bind(new InetSocketAddress(8080));
scc.register(boss, SelectionKey.OP_ACCEPT,null);
//创建固定数量的worker
//拿到cpu核心数量,注意在docker容器下部署拿到的会是物理数量而不是容器申请的核心数量,jdk10修复,使用jvm参数UseContainerSupport,默认开启
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-"+i);
}
while (true){
boss.select();
Set<SelectionKey> selectionKeys = boss.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
if (selectionKey.isAcceptable()){
SocketChannel sc = scc.accept();
sc.configureBlocking(false);
System.out.println("connected...");
//关联 selector
System.out.println("before register... address:"+sc.getRemoteAddress());
//round robin 轮询算法,负载均衡
workers[index++ % workers.length].register(sc);
System.out.println("after register...");
}
}
}
}
static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private boolean start = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
/**
* 初始化线程和selector
*/
public void register(SocketChannel sc) throws IOException {
if (!start){
synchronized (this){
if (!start){
start = true;
selector = Selector.open();
thread = new Thread(this,name);
thread.start();
}
}
}
queue.add(()->{
try {
sc.register(selector,SelectionKey.OP_READ,null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
//唤醒之前被阻塞的selector
selector.wakeup();
}
@Override
public void run() {
while (true){
try {
selector.select();
Runnable task = queue.poll();
if (task != null){
task.run();
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
try {
SocketChannel sc = (SocketChannel)key.channel();
int read = sc.read(buffer);
if (read == -1){
//取消本次
key.cancel();
}
buffer.flip();
while (buffer.hasRemaining()) {
char b = (char)buffer.get();
System.out.print(b);
}
System.out.println(Thread.currentThread().getName()+":handle...");
}catch (IOException e){
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
用队列解决了boss和worker线程之间数据交流的问题
Boss只关注accept,其它任务交给worker去执行,一个worker可以关注多个SocketChannel,worker的数量尽量等于CPU核心数
客户端测试
public class WriteServer {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(8080));
socketChannel.write(Charset.defaultCharset().encode("hello!"));
//pause..
System.in.read();
}
}
阻塞IO和非阻塞IO区别
阻塞IO(BIO)
java代码中的read是在用户态发起的调用,实际上还需要操作系统内核态下进行数据的读取
在没有读取到数据的情况下,用户线程将被阻塞在read方法上,直到等到有数据可读
在网卡中,当读取到数据后还需要进行数据的复制,然后将数据写入内存。
非阻塞IO(NIO)
非阻塞模式下用户线程下调用一次read方法,如果没有读取到数据,则立刻返回,执行下面的任务
当read方法读取到有数据时,则会等待操作系统内核态下读取数据并写入内存,这个期间还是会阻塞等待的
但是非阻塞暂停下如果频繁调用read方法会因为系统调用而影响效率
引入多路复用模式
需要引入selector对事件进行监视
select方法只阻塞一个线程,并且此线程又可以监视多个channel,实现了多路复用
同步与异步
同步:线程之间存在制约,顺序进行
异步:线程之间不存在制约,并发进行
异步不会发生线程阻塞
类似axios回调
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com