引入依赖
<dependency> <groupId>com.baomidou</groupId> <artifactId>dynamic-datasource-spring-boot-starter</artifactId> <version>3.5.1</version> </dependency>
跟mybatis-plus属于同一个开源组织 苞米豆
配置文件
spring: datasource: dynamic: primary: master # 严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源 strict: true datasource: master: url: jdbc:mysql://192.168.101.128:3307/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai username: root password: 123456 slave: url: jdbc:mysql://192.168.101.128:3308/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai username: root password: 123456 # 按需开启日志 logging: level: com.baomidou.dynamic: debug
方法或类上加上@DS
注解即可切换数据源
该框架获取数据库连接的核心逻辑是以下这段
在被DS注解标记的方法上, 会被此拦截器拦截, 获取到注解上定义的值, 并存入栈结构中
com.baomidou.dynamic.datasource.aop.DynamicDataSourceAnnotationInterceptor
public Object invoke(MethodInvocation invocation) throws Throwable { String dsKey = determineDatasourceKey(invocation); //获取注解值入栈 DynamicDataSourceContextHolder.push(dsKey); try { return invocation.proceed(); } finally { //方法结束后出栈 DynamicDataSourceContextHolder.poll(); } }
然后com.baomidou.dynamic.datasource.ds.AbstractRoutingDataSource#getConnection()
public Connection getConnection() throws SQLException { String xid = TransactionContext.getXID(); if (StringUtils.isEmpty(xid)) { return determineDataSource().getConnection(); } else { //获取栈顶的一个值 String ds = DynamicDataSourceContextHolder.peek(); ds = StringUtils.isEmpty(ds) ? "default" : ds; //根据值获取对应数据库的连接 ConnectionProxy connection = ConnectionFactory.getConnection(ds); return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection; } }
可以发现框架是通过维护一个栈结构进行对应数据源的切换, 类似方法的栈, 因为方法间可能嵌套调用, 所以使用此结构便于管理
但Spring的@Transactional
会影响@DS
例如
@Autowired @Lazy CurrentService currentService; @DS("master") @Transactional public void updateUser() { baseMapper.updateById(user); System.out.println(currentService.get()); } @DS("slave") public User get() { return baseMapper.selectById(1); }
在这里, master和slave是使用binlog搭建的读写分离架构
但实际get方法却能读取到updateUser所做的修改, 通过Debug也能看到真正的数据库连接属性, get方法还是使用的master库
因为在Spring管理下, 获取到数据库连接后, 会和当前线程进行绑定, 如果后面的方法被判断为不需要新建连接, 则复用之前与线程绑定的连接, 那么即使有DS注解
, 也切换不了库
如何判断需不需要新建连接? 看被调用方法是否定义了事务传播属性.
在org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
方法中
if (isExistingTransaction(transaction)) { //找到现有事务 -> 检查传播行为以了解行为方式 //当前已经存在一个事务 return handleExistingTransaction(def, transaction, debugEnabled); }
继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
//判断当前方法的隔离属性是否为PROPAGATION_REQUIRES_NEW if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } }
继续调用org.springframework.transaction.support.AbstractPlatformTransactionManager#startTransaction
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
继续调用org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //获取当前的数据源, 此处才能让@DS注解生效 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //将数据库连接绑定到事务 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); }
所以如果想让get方法读取从库, 则需要定义传播属性以便让Spring建立新连接
第二种方法就是使用该框架的@DSTransactional
该方法也会进行事务管理, 但功能比较简陋
这个注解还提供了一个本地事务
的功能: 解决多数据源的事务问题.
但这个功能也有问题, 不建议使用
看这个方法 com.baomidou.dynamic.datasource.tx.ConnectionFactory#notify
public static void notify(Boolean state) { try { Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get(); //获取当前线程所有的数据库连接, 通知其进行回滚/提交, 可能存在某一个事务提交成功, 某一事务提交失败. //并不能保证最终一致性 for (ConnectionProxy connectionProxy : concurrentHashMap.values()) { connectionProxy.notify(state); } } finally { CONNECTION_HOLDER.remove(); } }
对于这种多库事务, 建议使用Seata或消息队列
贴心的是, 框架还与Seata进行了整合
引入依赖
<dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>1.4.2</version> </dependency>
spring: datasource: dynamic: #seata1.0之后支持自动代理 这里直接配置true seata: true