【从入门到放弃-Java】并发编程-NIO-Selector

简介: 前言 前两篇【从入门到放弃-Java】并发编程-NIO-Channel和【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。今天我们来看下另一个重要的内容 Selector 简介 Selector是多路复用器,会不断轮询已经注册了的Channel。

前言

前两篇【从入门到放弃-Java】并发编程-NIO-Channel【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。
今天我们来看下另一个重要的内容 Selector

简介

Selector是多路复用器,会不断轮询已经注册了的Channel。当有注册的channel产生连接、读、写等事件时,就会被Selector发现,从而可以进行相关后续操作。

Selector的好处是,可以通过一个线程来管理多个通道,减少了创建线程的资源占用及线程切换带来的消耗

Selector

SelectableChannel可以通过SelectionKey(记录channel和selector的注册关系)注册到Selector上。Selector维护了三个SelectionKey集合:

  • key set:存放了Selector上已经注册了的Channel的key。可以通过keys()方法获取。
  • selected-key set:当之前注册感兴趣的事件到达时,set中的keys会被更新或添加,set中维护了当前至少有一个可以操作的事件的channel key的集合。是key set的子集。可以使用selectedKeys()获取。
  • cancelled-key:存放已经调用cancel方法取消,等待下次操作时会调用deregister取消注册的channel,调用deregister后,所有的set中都没有这个channel的key了。

open

/**
 * Opens a selector.
 *
 * <p> The new selector is created by invoking the {@link
 * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
 * of the system-wide default {@link
 * java.nio.channels.spi.SelectorProvider} object.  </p>
 *
 * @return  A new selector
 *
 * @throws  IOException
 *          If an I/O error occurs
 */
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

开启selector,具体的实现会根据操作系统类型创建不同的实现类,如macOS下实际上是new了一个KQueueSelectorProvider实例,低层基于操作系统的kqueue实现。

register

protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    //新建一个SelectionKey,记录channel与selector之间的注册关系
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    //前置操作,这里主要是判断下selector是否还处于open状态
    // register (if needed) before adding to key set
    implRegister(k);

    // 添加selectionKey至key set
    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        // 更新注册的事件码
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}

注册selector和channel之间的事件关系。

select

// timeout超时
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

@Override
public final int select() throws IOException {
    return lockAndDoSelect(null, -1);
}

// 不阻塞
@Override
public final int selectNow() throws IOException {
    return lockAndDoSelect(null, 0);
}

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    synchronized (this) {
        ensureOpen();
        if (inSelect)
            throw new IllegalStateException("select in progress");
        inSelect = true;
        try {
            synchronized (publicSelectedKeys) {
                return doSelect(action, timeout);
            }
        } finally {
            inSelect = false;
        }
    }
}

protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
{
    assert Thread.holdsLock(this);

    // 如果timeout = 0时,不阻塞
    long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout
    boolean blocking = (to != 0);
    boolean timedPoll = (to > 0);

    int numEntries;
    processUpdateQueue();
    processDeregisterQueue();
    try {
        // 设置interrupt 可以处理中断信号 防止线程一直阻塞
        begin(blocking);

        // 轮询的监听,直到有注册的事件发生或超时。
        do {
            long startTime = timedPoll ? System.nanoTime() : 0;
            numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
            if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
                // timed poll interrupted so need to adjust timeout
                long adjust = System.nanoTime() - startTime;
                to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                if (to <= 0) {
                    // timeout expired so no retry
                    numEntries = 0;
                }
            }
        } while (numEntries == IOStatus.INTERRUPTED);
        assert IOStatus.check(numEntries);

    } finally {
        end(blocking);
    }
    processDeregisterQueue();
    return processEvents(numEntries, action);
}

selectedKeys

public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}

获取被事件唤醒的key
注意:当被遍历处理selectedKeys时,key被处理完需要手动remove掉,防止下次被重复消费,selectedKeys不会帮你删除已处理过的key。

close

public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}


public final void implCloseSelector() throws IOException {
    //通知处于阻塞的select方法立即返回
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // 遍历所有的SelectionKey,取消注册
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}

SelectionKey

SelectionKey在channel register时创建。用来记录channel和selector之间的注册事件关系。
事件主要有:

  • OP_READ
  • OP_WRITE
  • OP_CONNECT
  • OP_ACCEPT

每个SelectionKey有两个由整数表示的操作集合,用来标识channel支持的操作类型。

interest set:是在创建SelectionKey时定义的,当集合中的操作发生时,将会把channel置为ready状态
ready set:检测到selector中已经就绪的操作类型集合

channel

public SelectableChannel channel() {
    return (SelectableChannel)channel;
}

获取SelectionKey中的channel

selector

public Selector selector() {
    return selector;
}

获取SelectionKey中的selector

isReadable

public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
}

根据readyOps(readySet)判断channel是否是可读状态

isWritable

public final boolean isWritable() {
    return (readyOps() & OP_WRITE) != 0;
}

根据readyOps(readySet)判断channel是否是可写状态

isConnectable

public final boolean isConnectable() {
    return (readyOps() & OP_CONNECT) != 0;
}

根据readyOps(readySet)判断channel是否是connect状态,通常是客户端使用,判断连接是否建立

isReadable

public final boolean isAcceptable() {
    return (readyOps() & OP_ACCEPT) != 0;
}

根据readyOps(readySet)判断channel是否是accept状态,通常是服务端使用,判断是否有客户端请求建立连接

总结

通过使用selector,可以使用一个线程来管理多个连接。需要注意的一点是,通常读、写操作都是比较耗时的,为了提高服务端的性能应该把Selector::select和read、write的具体处理逻辑在不同的线程中处理。
即:使用一个线程来进行select,只做分发。在获取到就绪的SelectionKey后,通过线程池在不同的线程中处理读写操作。

通过学习完NIO相关的知识,我们可以很清楚的回答下面这个问题

  • 问:基于BIO实现的server端,当建立100个连接时,需要多少个线程?基于NIO实现的呢?
  • 答:基于BIO实现的server端,通常需要由一个线程accept,并为每个新建立的连接创建一个线程去处理IO操作,因此需要 1个accept线程+100个IO线程
    基于NIO实现的server端,使用Selector多路复用机制,由一个线程进行select,为了提高并发可以使用线程池来处理IO操作,通常为了发挥CPU的性能会创建(cpu核数 x 2)个线程来处理IO操作。因此需要 1个select线程 + cpu核数 x 2 个IO线程

更多文章见:https://nc2era.com

目录
相关文章
|
1月前
|
Java 编译器 开发者
深入理解Java内存模型(JMM)及其对并发编程的影响
【9月更文挑战第37天】在Java的世界里,内存模型是隐藏在代码背后的守护者,它默默地协调着多线程环境下的数据一致性和可见性问题。本文将揭开Java内存模型的神秘面纱,带领读者探索其对并发编程实践的深远影响。通过深入浅出的方式,我们将了解内存模型的基本概念、工作原理以及如何在实际开发中正确应用这些知识,确保程序的正确性和高效性。
|
16天前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第17天】本文详细介绍了Java编程中Map的使用,涵盖Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的并发处理和性能优化技巧,适合初学者和进阶者学习。
32 3
|
1天前
|
Java 大数据 API
14天Java基础学习——第1天:Java入门和环境搭建
本文介绍了Java的基础知识,包括Java的简介、历史和应用领域。详细讲解了如何安装JDK并配置环境变量,以及如何使用IntelliJ IDEA创建和运行Java项目。通过示例代码“HelloWorld.java”,展示了从编写到运行的全过程。适合初学者快速入门Java编程。
|
4天前
|
存储 缓存 安全
Java内存模型(JMM):深入理解并发编程的基石####
【10月更文挑战第29天】 本文作为一篇技术性文章,旨在深入探讨Java内存模型(JMM)的核心概念、工作原理及其在并发编程中的应用。我们将从JMM的基本定义出发,逐步剖析其如何通过happens-before原则、volatile关键字、synchronized关键字等机制,解决多线程环境下的数据可见性、原子性和有序性问题。不同于常规摘要的简述方式,本摘要将直接概述文章的核心内容,为读者提供一个清晰的学习路径。 ####
16 2
|
2天前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
7天前
|
存储 安全 Java
🌟Java零基础-反序列化:从入门到精通
【10月更文挑战第21天】本文收录于「滚雪球学Java」专栏,专业攻坚指数级提升,希望能够助你一臂之力,帮你早日登顶实现财富自由🚀;同时,欢迎大家关注&&收藏&&订阅!持续更新中,up!up!up!!
33 5
|
5天前
|
安全 Java 调度
Java中的多线程编程入门
【10月更文挑战第29天】在Java的世界中,多线程就像是一场精心编排的交响乐。每个线程都是乐团中的一个乐手,他们各自演奏着自己的部分,却又和谐地共同完成整场演出。本文将带你走进Java多线程的世界,让你从零基础到能够编写基本的多线程程序。
17 1
|
11天前
|
Java 数据处理 开发者
Java多线程编程的艺术:从入门到精通####
【10月更文挑战第21天】 本文将深入探讨Java多线程编程的核心概念,通过生动实例和实用技巧,引导读者从基础认知迈向高效并发编程的殿堂。我们将一起揭开线程管理的神秘面纱,掌握同步机制的精髓,并学习如何在实际项目中灵活运用这些知识,以提升应用性能与响应速度。 ####
36 3
|
13天前
|
Java
Java中的多线程编程:从入门到精通
本文将带你深入了解Java中的多线程编程。我们将从基础概念开始,逐步深入探讨线程的创建、启动、同步和通信等关键知识点。通过阅读本文,你将能够掌握Java多线程编程的基本技能,为进一步学习和应用打下坚实的基础。
|
14天前
|
存储 安全 Java
从入门到精通:Java Map全攻略,一篇文章就够了!
【10月更文挑战第19天】本文介绍了Java编程中重要的数据结构——Map,通过问答形式讲解了Map的基本概念、创建、访问与修改、遍历方法、常用实现类(如HashMap、TreeMap、LinkedHashMap)及其特点,以及Map在多线程环境下的使用和性能优化技巧,适合初学者和进阶者学习。
39 4