JUC 常用 4 大并发工具类是哪几个?(面试必问)(2)

简介: JUC 常用 4 大并发工具类是哪几个?(面试必问)(2)

Semaphore:

Semaphore,俗称信号量,作用于控制同时访问某个特定资源的线程数量,用在流量控制

一说特定资源控制,那么第一时间就想到了数据库连接..

之前用等待超时模式写了一个数据库连接池,打算用这个Semaphone也写一个


/**
 * Creates a {@code Semaphore} with the given number of
 * permits and nonfair fairness setting.
 *
 * @param permits the initial number of permits available.
 *        This value may be negative, in which case releases
 *        must occur before any acquires will be granted.
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

在源码中可以看到在构建Semaphore信号量的时候,需要传入许可证的数量,这个数量就是资源的最大允许的访问的线程数


接下里用信号量实现一个数据库连接池


连接对象

package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
/**
 * 数据库连接
 * @author ZYGisComputer
 */
public class SqlConnection implements Connection {
    /**
     * 获取数据库连接
     * @return
     */
    public static final Connection fetchConnection(){
        return new SqlConnection();
    }
    @Override
    public void commit() throws SQLException {
        SleepTools.ms(70);
    }
    @Override
    public Statement createStatement() throws SQLException {
        SleepTools.ms(1);
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return null;
    }
    @Override
    public String nativeSQL(String sql) throws SQLException {
        return null;
    }
    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
    }
    @Override
    public boolean getAutoCommit() throws SQLException {
        return false;
    }
    @Override
    public void rollback() throws SQLException {
    }
    @Override
    public void close() throws SQLException {
    }
    @Override
    public boolean isClosed() throws SQLException {
        return false;
    }
    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return null;
    }
    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
    }
    @Override
    public boolean isReadOnly() throws SQLException {
        return false;
    }
    @Override
    public void setCatalog(String catalog) throws SQLException {
    }
    @Override
    public String getCatalog() throws SQLException {
        return null;
    }
    @Override
    public void setTransactionIsolation(int level) throws SQLException {
    }
    @Override
    public int getTransactionIsolation() throws SQLException {
        return 0;
    }
    @Override
    public SQLWarning getWarnings() throws SQLException {
        return null;
    }
    @Override
    public void clearWarnings() throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return null;
    }
    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return null;
    }
    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
    }
    @Override
    public void setHoldability(int holdability) throws SQLException {
    }
    @Override
    public int getHoldability() throws SQLException {
        return 0;
    }
    @Override
    public Savepoint setSavepoint() throws SQLException {
        return null;
    }
    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return null;
    }
    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
    }
    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
    }
    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return null;
    }
    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return null;
    }
    @Override
    public Clob createClob() throws SQLException {
        return null;
    }
    @Override
    public Blob createBlob() throws SQLException {
        return null;
    }
    @Override
    public NClob createNClob() throws SQLException {
        return null;
    }
    @Override
    public SQLXML createSQLXML() throws SQLException {
        return null;
    }
    @Override
    public boolean isValid(int timeout) throws SQLException {
        return false;
    }
    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
    }
    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
    }
    @Override
    public String getClientInfo(String name) throws SQLException {
        return null;
    }
    @Override
    public Properties getClientInfo() throws SQLException {
        return null;
    }
    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return null;
    }
    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return null;
    }
    @Override
    public void setSchema(String schema) throws SQLException {
    }
    @Override
    public String getSchema() throws SQLException {
        return null;
    }
    @Override
    public void abort(Executor executor) throws SQLException {
    }
    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
    }
    @Override
    public int getNetworkTimeout() throws SQLException {
        return 0;
    }
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }
    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }
}


连接池对象

package org.dance.day2.util.pool;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
 * 使用信号量控制数据库的链接和释放
 *
 * @author ZYGisComputer
 */
public class DBPoolSemaphore {
    /**
     * 池容量
     */
    private final static int POOL_SIZE = 10;
    /**
     * useful 代表可用连接
     * useless 代表已用连接
     *  为什么要使用两个Semaphore呢?是因为,在连接池中不只有连接本身是资源,空位也是资源,也需要记录
     */
    private final Semaphore useful, useless;
    /**
     * 连接池
     */
    private final static LinkedList<Connection> POOL = new LinkedList<>();
    /**
     * 使用静态块初始化池
     */
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            POOL.addLast(SqlConnection.fetchConnection());
        }
    }
    public DBPoolSemaphore() {
        // 初始可用的许可证等于池容量
        useful = new Semaphore(POOL_SIZE);
        // 初始不可用的许可证容量为0
        useless = new Semaphore(0);
    }
    /**
     * 获取数据库连接
     *
     * @return 连接对象
     */
    public Connection takeConnection() throws InterruptedException {
        // 可用许可证减一
        useful.acquire();
        Connection connection;
        synchronized (POOL) {
            connection = POOL.removeFirst();
        }
        // 不可用许可证数量加一
        useless.release();
        return connection;
    }
    /**
     * 释放链接
     *
     * @param connection 连接对象
     */
    public void returnConnection(Connection connection) throws InterruptedException {
        if(null!=connection){
            // 打印日志
            System.out.println("当前有"+useful.getQueueLength()+"个线程等待获取连接,,"
                    +"可用连接有"+useful.availablePermits()+"个");
            // 不可用许可证减一
            useless.acquire();
            synchronized (POOL){
                POOL.addLast(connection);
            }
            // 可用许可证加一
            useful.release();
        }
    }
}

测试类:

package org.dance.day2.util.pool;
import org.dance.tools.SleepTools;
import java.sql.Connection;
import java.util.Random;
/**
 * 测试Semaphore
 * @author ZYGisComputer
 */
public class UseSemaphore {
    /**
     * 连接池
     */
    public static final DBPoolSemaphore pool = new DBPoolSemaphore();
    private static class BusiThread extends Thread{
        @Override
        public void run() {
            // 随机数工具类 为了让每个线程持有连接的时间不一样
            Random random = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connection = pool.takeConnection();
                System.out.println("Thread_"+Thread.currentThread().getId()+
                        "_获取数据库连接耗时["+(System.currentTimeMillis()-start)+"]ms.");
                // 模拟使用连接查询数据
                SleepTools.ms(100+random.nextInt(100));
                System.out.println("查询数据完成归还连接");
                pool.returnConnection(connection);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            BusiThread busiThread = new BusiThread();
            busiThread.start();
        }
    }
}

测试返回结果:

Thread_11_获取数据库连接耗时[0]ms.
Thread_12_获取数据库连接耗时[0]ms.
Thread_13_获取数据库连接耗时[0]ms.
Thread_14_获取数据库连接耗时[0]ms.
Thread_15_获取数据库连接耗时[0]ms.
Thread_16_获取数据库连接耗时[0]ms.
Thread_17_获取数据库连接耗时[0]ms.
Thread_18_获取数据库连接耗时[0]ms.
Thread_19_获取数据库连接耗时[0]ms.
Thread_20_获取数据库连接耗时[0]ms.
查询数据完成归还连接
当前有40个线程等待获取连接,,可用连接有0个
Thread_21_获取数据库连接耗时[112]ms.
查询数据完成归还连接
...................查询数据完成归还连接
当前有2个线程等待获取连接,,可用连接有0个
Thread_59_获取数据库连接耗时[637]ms.
查询数据完成归还连接
当前有1个线程等待获取连接,,可用连接有0个
Thread_60_获取数据库连接耗时[660]ms.
查询数据完成归还连接
当前有0个线程等待获取连接,,可用连接有0个
查询数据完成归还连接...................
当前有0个线程等待获取连接,,可用连接有8个
查询数据完成归还连接
当前有0个线程等待获取连接,,可用连接有9个


通过执行结果可以很明确的看到,一上来就有10个线程获取到了连接,,然后后面的40个线程进入阻塞,然后只有释放链接之后,等待的线程就会有一个拿到,然后越后面的线程等待的时间就越长,然后一直到所有的线程执行完毕


最后打印的可用连接有九个不是因为少了一个是因为在释放之前打印的,不是错误


从结果中可以看到,我们对连接池中的资源的到了控制,这就是信号量的流量控制




Exchanger:

Exchanger,俗称交换器,用于在线程之间交换数据,但是比较受限,因为只能两个线程之间交换数据

/**
 * Creates a new Exchanger.
 */
public Exchanger() {
    participant = new Participant();
}


这个构造函数没有什么好说的,也没有入参,只有在创建的时候指定一下需要交换的数据的泛型即可,下面看代码

package org.dance.day2.util;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
 * 线程之间交换数据
 * @author ZYGisComputer
 */
public class UseExchange {
    private static final Exchanger<Set<String>> exchanger = new Exchanger<>();
    public static void main(String[] args) {
        new Thread(){
            @Override
            public void run() {
                Set<String> aSet = new HashSet<>();
                aSet.add("A");
                aSet.add("B");
                aSet.add("C");
                try {
                    Set<String> exchange = exchanger.exchange(aSet);
                    for (String s : exchange) {
                        System.out.println("aSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                Set<String> bSet = new HashSet<>();
                bSet.add("1");
                bSet.add("2");
                bSet.add("3");
                try {
                    Set<String> exchange = exchanger.exchange(bSet);
                    for (String s : exchange) {
                        System.out.println("bSet"+s);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

执行结果:

bSetAbSetBbSetCaSet1aSet2aSet3

通过执行结果可以清晰的看到,两个线程中的数据发生了交换,这就是Exchanger的线程数据交换了

以上就是JUC的4大常用并发工具类了


相关文章
|
8月前
|
存储 NoSQL Redis
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 + 无锁架构 + EDA架构 + 异步日志 + 集群架构
阿里面试:Redis 为啥那么快?怎么实现的100W并发?说出了6大架构,面试官跪地: 纯内存 + 尖端结构 +  无锁架构 +  EDA架构  + 异步日志 + 集群架构
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
|
存储 缓存 安全
【Java面试题汇总】多线程、JUC、锁篇(2023版)
线程和进程的区别、CAS的ABA问题、AQS、哪些地方使用了CAS、怎么保证线程安全、线程同步方式、synchronized的用法及原理、Lock、volatile、线程的六个状态、ThreadLocal、线程通信方式、创建方式、两种创建线程池的方法、线程池设置合适的线程数、线程安全的集合?ConcurrentHashMap、JUC
【Java面试题汇总】多线程、JUC、锁篇(2023版)
|
Java 程序员 调度
面试准备-并发
面试准备-并发
|
Java 调度 Android开发
Android面试题之Kotlin中async 和 await实现并发的原理和面试总结
本文首发于公众号“AntDream”,详细解析了Kotlin协程中`async`与`await`的原理及其非阻塞特性,并提供了相关面试题及答案。协程作为轻量级线程,由Kotlin运行时库管理,`async`用于启动协程并返回`Deferred`对象,`await`则用于等待该对象完成并获取结果。文章还探讨了协程与传统线程的区别,并展示了如何取消协程任务及正确释放资源。
381 0
|
消息中间件 Java 中间件
复盘女朋友面试4个月的并发面试题
该文章主要复盘了关于并发的面试题,包括线程池的使用场景、原理、参数合理化设置,以及ThreadLocal、volatile、synchronized关键字的使用场景和原理,还介绍了juc并发工具包中aqs的原理,强调在面试中要将自己理解的点与面试官讲透。
复盘女朋友面试4个月的并发面试题
|
JavaScript 前端开发 Java
面试官:假如有几十个请求,如何去控制并发?
面试官:假如有几十个请求,如何去控制并发?
|
Java 程序员 容器
【多线程面试题二十四】、 说说你对JUC的了解
这篇文章介绍了Java并发包java.util.concurrent(简称JUC),它是JSR 166规范的实现,提供了并发编程所需的基础组件,包括原子更新类、锁与条件变量、线程池、阻塞队列、并发容器和同步器等多种工具。
|
存储 Java
【IO面试题 四】、介绍一下Java的序列化与反序列化
Java的序列化与反序列化允许对象通过实现Serializable接口转换成字节序列并存储或传输,之后可以通过ObjectInputStream和ObjectOutputStream的方法将这些字节序列恢复成对象。
|
存储 算法 Java
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?
本文详解自旋锁的概念、优缺点、使用场景及Java实现。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:什么是自旋锁?Java 实现自旋锁的原理?