NIO基础

NIO基础

阻塞模式

打开服务器套接字通道

//使用nio 来理解阻塞模式
ServerSocketChannel ssc = ServerSocketChannel.open();

绑定服务器监听端口

ssc.bind(new InetSocketAddress(8080));

建立与客户端之间的连接

SocketChannel用来与客户端之间通信

//建立与客户端的连接
SocketChannel socketChannel = ssc.accept();

接收数据调用read方法

需要提供一个ByteBuffer

socketChannel.read(buffer);

Server

public class Server {

    //全局ByteBuffer
    public static ByteBuffer buffer = ByteBuffer.allocate(16);

    public static void main(String[] args) throws IOException {

        //使用nio 来理解阻塞模式
        ServerSocketChannel ssc = ServerSocketChannel.open();

        //绑定服务器监听端口
        ssc.bind(new InetSocketAddress(8080));

        //建立连接集合
        List<SocketChannel> channels = new ArrayList<>();

        while (true){
            System.out.println("connecting...");
            //建立与客户端的连接,阻塞方法
            SocketChannel socketChannel = ssc.accept();
            //添加连接
            channels.add(socketChannel);

            //接收客户端发送的数据
            for (SocketChannel channel : channels) {
                System.out.println("reading...");
                //接收数据调用read方法
                socketChannel.read(buffer);

                System.out.println(Charset.defaultCharset().decode(buffer));
                //切换回写模式,以便于下次读取
                buffer.flip();
                buffer.clear();

            }
        }

    }

}

Client

public class Client {

    public static void main(String[] args) throws IOException, InterruptedException {

        int i = 0;

        //建立连接
        SocketChannel socketChannel = null;
        while (true) {
            try {
                if (socketChannel == null || !socketChannel.isConnected()) {
                    System.out.println("connecting...");
                    socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
                }

                //发送消息
                String message = i+++"Hello Server!";
                ByteBuffer buffer = Charset.defaultCharset().encode(message);

                System.out.println("sending...");
                socketChannel.write(buffer);

                //保持建立通讯,每隔4秒发送一次消息
                Thread.sleep(4000);
            } catch (IOException e) {
                System.out.println("connection failed, retrying...");
                socketChannel = null;
            }
        }
    }
}

此时处于阻塞模式下,当Sever执行accept时,则被阻塞,无法执行后续的read操作,因此Client发送的消息一直无法得到处理

非阻塞模式

//切换为非阻塞模式
ssc.configureBlocking(false);

Server

public class Server {

    private static ByteBuffer buffer = ByteBuffer.allocate(100);

    public static void main(String[] args) throws IOException {

        //使用nio 来理解阻塞模式
        ServerSocketChannel ssc = ServerSocketChannel.open();

        //切换为非阻塞模式
        ssc.configureBlocking(false);

        //绑定服务器监听端口
        ssc.bind(new InetSocketAddress(8080));

        //建立连接集合
        List<SocketChannel> channels = new ArrayList<>();

        while (true){

            //建立与客户端的连接,阻塞方法
            SocketChannel socketChannel = ssc.accept();
            if (socketChannel != null){
                System.out.println("connect success!");
                //添加连接
                channels.add(socketChannel);
            }


            for (SocketChannel channel : channels) {
                //接收客户端发送的数据
                System.out.println("reading...");
                //接收数据调用read方法,非阻塞
                int read = channel.read(buffer);//如果没有读取到数据则返回0
               if (read > 0){
                   buffer.flip();
                   System.out.println(Charset.defaultCharset().decode(buffer));

                   //切换回写模式,以便于下次读取
                   buffer.clear();
               }
            }
        }

    }

}

Client

public class Client {

    public static void main(String[] args) throws IOException, InterruptedException {

        int i = 0;

        //建立连接
        SocketChannel socketChannel = null;
        while (true) {
            try {
                if (socketChannel == null || !socketChannel.isConnected()) {
                    System.out.println("connecting...");
                    socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
                }

                //发送消息
                String message = "Hello Server!"+i++;
                ByteBuffer buffer = Charset.defaultCharset().encode(message);

                System.out.println("sending...");
                socketChannel.write(buffer);

                //保持建立通讯,每隔4秒发送一次消息
                Thread.sleep(4000);
            } catch (IOException e) {
                System.out.println("connection failed, retrying...");
                socketChannel = null;
            }
        }
    }
}

非阻塞模式下,服务端不会因为accept方法而阻塞,而是会继续往下运行处理工作

弊端:但是线程过于繁忙,在无连接请求或是无数据可读时也会不断循环

  • accept事件 会在有连接请求时触发
  • connect事件 客户端连接建立后触发的事件
  • read事件 可读事件
  • write事件 可写事件

Selector模式下的非阻塞模式

处理单个事件

  • 将ServerSocketChannel注册到Selector上
  • 指明key所关注的事件
  • 通过select方法等待事件发生
  • 通过selector.selectedKeys方法获取到事件集合
  • 用迭代器遍历,通过key.channel方法获取到触发事件的channel
//创建Selector,可以管理多个channel
Selector selector = Selector.open();

//建立Selector和channel的联系,0表示不关注任何事件
SelectionKey sscKey = ssc.register(selector, 0, null);
//指明key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);

 while (true) {

            //select方法,没有事件就阻塞,有事件发生则线程才会恢复
            selector.select();

            //处理事件,先获取到事件集合,内部包含了所有发生的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();

                //获取到触发事件的channel
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();

                SocketChannel sc = channel.accept();

                System.out.println(sc);
            }
        }

存在未处理事件时,selector是不会阻塞的,显然我们需要去处理这些事件

可以通过key.cancel(); 方法取消本次事件处理

  • 因此事件发生后要么处理,要么需要取消,不能不处理

处理多个事件

两个集合:

先称注册在selector中的所有key的集合叫集合1

Selector内部存在着一个selectionKey集合,当注册channel时,就会向集合1中放一个key

当select方法发现了新的事件后,就会把该事件的发生channel的key添加到一个新集合selectedKeys中

  • 注意:但是并不会主动去删除新集合中的key,只会标记为已经处理过

因此我们必须移除掉已经处理过的key,否则下次再次处理时则容易发生错误

iterator.remove();

//建立Selector和channel的联系,0表示不关注任何事件,为channel注册key
SelectionKey sscKey = ssc.register(selector, 0, null);
//指明key只关注accept事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);

while (true) {

    //select方法,没有事件就阻塞,有事件发生则线程才会恢复
    selector.select();

    //处理事件,先获取到事件集合,内部包含了所有发生的事件
    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        //要将key从集合中删除
        iterator.remove();
        
        //区分事件类型
        if (key.isAcceptable()){
            //获取到触发事件的channel
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();

            SocketChannel sc = channel.accept();

            //使得channel工作在非阻塞模式下
            sc.configureBlocking(false);
            //注册key
            SelectionKey scKey = sc.register(selector, 0, null);
            //关注读事件
            scKey.interestOps(SelectionKey.OP_READ);
        }else if (key.isReadable()){
            SocketChannel sc = (SocketChannel)key.channel();
            sc.read(buffer);
            buffer.flip();
            System.out.println(Charset.defaultCharset().decode(buffer));
            buffer.clear();
        }
    }
}

我们可以通过key.isXXX方法来判断触发的事件类型

处理客户端断开

 try {
                       SocketChannel sc = (SocketChannel)key.channel();
                       int read = sc.read(buffer);
                       if (read == -1){
                           //正常断开也取消此key
                           key.cancel();
                       }
                       buffer.flip();
                       System.out.println(Charset.defaultCharset().decode(buffer));
                       buffer.clear();
                   }catch (IOException e){
                       e.printStackTrace();
                       //注销掉因为断开连接异常而触发read事件的key
                       key.cancel();
                   }

cancel() 方法 将此channel所对应的key从selector集合中注销

如果是正常断开,则读的时候会读取到-1,则取消key

如果时异常断开,则在catch语句块中取消key

处理消息边界

方法一:客户端与服务端约定buffer长度

  • 缺点:浪费空间

方法二:携带消息长度信息

  • 比较常用

Buffer大小分配

每个channel都需要记录可能被切分的消息,因为ByteBuffer是线程不安全的,因此需要为每个channel维护一个独立的ByteBuffer

  • ByteBuffer不能太大,比如一个ByteBuffer要1Mb,百万连接就要1Tb内存,因此需要设计大小可变的ByteBuffer
    • 一种思路是首先分配一个较小的buffer,如4k,如果发现不够用,再分配8k的buffer,将4k的buffer内容拷贝至8k的buffer,优点是消息容易连续处理,缺点是数据拷贝消耗性能
    • 另一种思路是用多个数组组成buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能消耗

注册时可以给selectionKey携带附件buffer

Selectionkey scKey = sc.register(selector,0,buffer);
//获取携带的附件
ByteBuffer buffer = (ByteBuffer) key.attachment(); 

关注两个事件

无法一次性写到buffer中时,我们应该让selectionkey去做别的事情

//第一次写发现buffer还有数据无法被写入
if(buffer.hasRemaining()){
    sckey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
    //将未写完的数据附带给selectionkey
    sckey.attach(buffer);
}

//如果可写
if(key.isWritable()){
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    SocketChannel sc = (SocketChannel) key.channel();
    int write = sc.write(buffer);
    //写完后清理附件
    if(!buffer.hasRemaining()){
        key.attach(null);
        //不再关注可写事件
        key.interestOps(key.interestOps()-Selection.OP_WRITE);
    }
}

阻塞VS非阻塞

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept() 会在没有连接建立的时候让线程暂停
  • SocketChannel.read() 会在没有数据可读的时候让线程暂停
  • 阻塞的表现就是线程暂停了,暂停期间不会占用cpu,但是线程处于闲置状态
  • 单线程下,阻塞方法之间相互影响,不能工作,需要多线程支持
  • 多线程下,jvm线程占用内存,如果连接数量过多,会导致OOM,会因为线程过多频繁切换上下文导致性能下降
  • 可以采用线程池,但不适合长连接

非阻塞模式下,相关方法不会让线程暂停

  • 在ServerSocketChannel.accept() 在没有建立连接时,会返回null,继续运行
  • SocketChannel.read() 在没有数据可读时,会返回0,但线程不必阻塞,可以去执行其它SocketChannel的read或是去执行ServerSocketChannel.accept()
  • 写数据时,线程只是等待写入Channel即可,无需等待Channel通过网络把数据发送出去
  • 非阻塞模式下,没有连接建立,和可读数据,线程仍然不断运行,浪费了cpu资源

多路复用Selector

适用于网络IO

单个线程可以同时监听多个channel的事件

单线程可以配合Selector完成对多个Channel可读写事件的监控,着称之为多路复用

select何时不阻塞?

  • 事件发生的时候
    • 客户端发起连接请求,触发accept事件
    • 客户端发送数据,客户端正常,异常关闭,都会触发read事件,另外如果发送的数据大于缓冲区,还会触发多次读取事件
    • channel可写,会触发write事件
    • 在linux下nio bug发生时
  • 调用selector.wakeup()
  • 调用selector.close()
  • selector所在线程interrupt

多线程配合Selector优化

对象:Boss,worker

Boss只关注连接,不负责数据读写

  • thread + selector

worker负责数据读写

  • thread + selector

worker数不应该设置过多,要考虑cpu核心数合理分配

worker配合selector管理多个channel

场景问题:selector的select方法执行时,会导致register关联selector时阻塞

  • 通过消息队列来解决线程之间交流问题
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

//向队列中添加了任务,但是任务并不立刻执行
queue.add(()->{
                    try {
                        sc.register(selector,SelectionKey.OP_READ,null);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                });

服务端代码

package com.os467;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Demo1 {

    private static int index = 0;

    public static void main(String[] args) throws IOException {

        Thread.currentThread().setName("boss");

        ServerSocketChannel scc = ServerSocketChannel.open();

        scc.configureBlocking(false);

        Selector boss = Selector.open();

        scc.bind(new InetSocketAddress(8080));

        scc.register(boss, SelectionKey.OP_ACCEPT,null);

        //创建固定数量的worker
        //拿到cpu核心数量,注意在docker容器下部署拿到的会是物理数量而不是容器申请的核心数量,jdk10修复,使用jvm参数UseContainerSupport,默认开启
        Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new Worker("worker-"+i);
        }


        while (true){
            boss.select();

            Set<SelectionKey> selectionKeys = boss.selectedKeys();

            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {

                SelectionKey selectionKey = iterator.next();

                iterator.remove();

                if (selectionKey.isAcceptable()){
                    SocketChannel sc = scc.accept();
                    sc.configureBlocking(false);
                    System.out.println("connected...");
                    //关联 selector
                    System.out.println("before register... address:"+sc.getRemoteAddress());
                    //round robin 轮询算法,负载均衡
                    workers[index++ % workers.length].register(sc);
                    System.out.println("after register...");
                }

            }
        }
    }

    static class Worker implements Runnable {

        private Thread thread;

        private Selector selector;

        private String name;

        private boolean start = false;

        private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

        public Worker(String name) {
            this.name = name;
        }

        /**
         * 初始化线程和selector
         */
        public void register(SocketChannel sc) throws IOException {
            if (!start){
                synchronized (this){
                    if (!start){
                        start = true;
                        selector = Selector.open();
                        thread = new Thread(this,name);
                        thread.start();
                    }
                }
            }
            queue.add(()->{
                try {
                    sc.register(selector,SelectionKey.OP_READ,null);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
            //唤醒之前被阻塞的selector
            selector.wakeup();
        }

        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                    Runnable task = queue.poll();
                    if (task != null){
                        task.run();
                    }


                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(16);
                            try {
                                SocketChannel sc = (SocketChannel)key.channel();
                                int read = sc.read(buffer);
                                if (read == -1){
                                    //取消本次
                                    key.cancel();
                                }
                                buffer.flip();
                                while (buffer.hasRemaining()) {
                                    char b = (char)buffer.get();
                                    System.out.print(b);
                                }
                                System.out.println(Thread.currentThread().getName()+":handle...");
                            }catch (IOException e){
                                key.cancel();
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

用队列解决了boss和worker线程之间数据交流的问题

Boss只关注accept,其它任务交给worker去执行,一个worker可以关注多个SocketChannel,worker的数量尽量等于CPU核心数

客户端测试

public class WriteServer {

    public static void main(String[] args) throws IOException {

        SocketChannel socketChannel = SocketChannel.open();

        socketChannel.connect(new InetSocketAddress(8080));

        socketChannel.write(Charset.defaultCharset().encode("hello!"));

        //pause..
        System.in.read();

    }

}

阻塞IO和非阻塞IO区别

阻塞IO(BIO)

java代码中的read是在用户态发起的调用,实际上还需要操作系统内核态下进行数据的读取

在没有读取到数据的情况下,用户线程将被阻塞在read方法上,直到等到有数据可读

在网卡中,当读取到数据后还需要进行数据的复制,然后将数据写入内存。

非阻塞IO(NIO)

非阻塞模式下用户线程下调用一次read方法,如果没有读取到数据,则立刻返回,执行下面的任务

当read方法读取到有数据时,则会等待操作系统内核态下读取数据并写入内存,这个期间还是会阻塞等待的

但是非阻塞暂停下如果频繁调用read方法会因为系统调用而影响效率

引入多路复用模式

需要引入selector对事件进行监视

select方法只阻塞一个线程,并且此线程又可以监视多个channel,实现了多路复用

同步与异步

  • 同步:线程之间存在制约,顺序进行

  • 异步:线程之间不存在制约,并发进行

    异步不会发生线程阻塞

    类似axios回调


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com

文章标题:NIO基础

字数:3.6k

本文作者:Os467

发布时间:2023-03-29, 21:31:10

最后更新:2023-04-02, 21:09:09

原始链接:https://os467.github.io/2023/03/29/NIO%E5%9F%BA%E7%A1%80/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

×

喜欢就点赞,疼爱就打赏