sharding jdbc多数据源同表结果查询问题

简介: 以往项目中,客户规定,各个部门的数据不能放在同一个数据库,导致办件服务存储不同部门的数据库在不同的数据源中,引发的多数据源查询的一系列问题

一、背景

      以往项目客户规定,各个部门的数据不能放在同一个数据库,导致办件服务存储不同部门的数据库在不同的数据源中,引发的多数据源查询的一系列问题

二、sharding jdbc 简介

     Sharding-JDBC是谷歌的一个开源的框架,提供标准化的数据分片、分布式事务和数据库治理功能,定位为轻量级Java框架,在Java的JDBC层提供的额外服务。 它使用客户端直连数据库,以jar包形式提供服务,无需额外部署和依赖,可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM框架。
适用于任何基于Java的ORM框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。
基于任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, Druid, HikariCP等。
支持任意实现JDBC规范的数据库。目前支持MySQL,Oracle,SQLServer和PostgreSQL。

   三、项目融合sharding jdbc

     办件服务现有的是动态切换对应的数据源,key是部门的id,并不满足我们对一条sql语句就可以查询所有的部门数据的问题,所以要对现有有的逻辑进行改造

    首先我们要获取到所有的数据源的信息,初始化一份数据,如下图,这个是现有的存数据源信息的表dic_database_info

   

  然后我们要通过这张表获取所有的部门的数据源,并且初始化到线程池中,代码如下:

/*** @Description: 通过jdbc查询出数据连接信息  * @return* @date 2020-09-11 12:25* @throws*/privateMap<String, DataSource>getDepartmentDataSources (DataSourcerdsDataSource){
Stringsql="select id, department_id as departmentId, access_url as accessUrl,\n"+"        access_key as accessKey, access_value as accessValue\n"+"        from dic_database_info where status=1 order by id ";
Map<String, DataSource>dataSourceMap=newHashMap<>();
try {
PreparedStatementpreparedStatement=rdsDataSource.getConnection().prepareStatement(sql);
ResultSetresultSet=preparedStatement.executeQuery();
while (resultSet.next()){
StringdepartmentId=String.valueOf(resultSet.getLong("departmentId"));
StringaccessUrl=resultSet.getString("accessUrl");
StringaccessKey=resultSet.getString("accessKey");
StringaccessValue=resultSet.getString("accessValue");
accessValue=JasyptUtils.decrypt(accessValue,salt);
DataSourcedataSource=DataSourceBuilder.create().driverClassName(driverClassName)
                        .url(accessUrl)
                        .username(accessKey)
                        .password(accessValue)
                        .build();
StringserialNum=shardingSphereDataSourceSerialRecord.addDataSourceAlias(departmentId);
dataSourceMap.put(serialNum,dataSource);
            }
        }catch (SQLExceptione){
LOGGER.error("通过JDBC查询各局数据库信息出错:{}",e.getMessage());
        }
returndataSourceMap;
    }


   然后在maven引入包

<dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-core</artifactId><version>4.1.1</version></dependency>


由于办件服务只需要查询某一些表,所以我们把查询的表的表名配置在配置文件中

spring.shardingsphere.sharding.ext.tables=agent_materials,agents,applicant_materials,applicants,application_result_info,current_node,fee_unit_info,group_data_search,mark_results,node_result,paid_info,result_receive_info,service_application,service_attachment,service_correction,service_info_apply_nullify,service_info_nullify,tab_accept_affair_request,tab_apply_invalid_service_application,tab_apply_revoke_service_application,tab_get_node_info,tab_service_application_accepted,tab_service_application_result,tab_service_application_result_info


    然后在配置文件中获取

@ConfigurationProperties(prefix="spring.shardingsphere.sharding.ext")
publicstaticclassShardingJdbcProperties {
privateList<String>tables=Collections.emptyList();
publicList<String>getTables() {
returntables;
        }
publicvoidsetTables(List<String>tables) {
this.tables=tables;
        }
    }



     初始化配置文件

@OverridepublicvoidafterPropertiesSet() throwsException {
ShardingJdbcPropertiesshardingJdbcProperties=beanFactory.getBean(ShardingJdbcProperties.class);
if (shardingJdbcProperties!=null){
logicTables=newHashSet<>(shardingJdbcProperties.getTables());
initialized=true;
lock.lock();
try{
condition.signalAll();
            }finally {
lock.unlock();
            }
        }
    }
@OverridepublicvoidsetBeanFactory(BeanFactorybeanFactory) throwsBeansException {
this.beanFactory=beanFactory;
    }

 

  最后初始化sharding jdbc,并且保存数据源,表信息

@Bean(name="apiDataSource")
@ConfigurationProperties(prefix="spring.datasource.api")
publicDataSourcerdsDataSource() {
returnDataSourceBuilder.create().build();
    }
@Bean@PrimarypublicDataSourceshardingSphereDataSource(ShardingJdbcPropertiesshardingJdbcProperties, @Qualifier("apiDataSource") DataSourcedataSource) throwsSQLException {
finalStringMASTER_DATASOURCE_ALIAS="master";
ShardingRuleConfigurationshardingRuleConfig=newShardingRuleConfiguration();
shardingRuleConfig.setDefaultDatabaseShardingStrategyConfig(newHintShardingStrategyConfiguration(
newDepartmentDataSourceHintShardingAlgorithm()));
shardingRuleConfig.setDefaultTableShardingStrategyConfig(newHintShardingStrategyConfiguration(
newDepartmentDataSourceHintShardingAlgorithm.DepartmentTableHintShardingAlgorithm()));
Map<String, DataSource>fullDataSourceMap=newHashMap<>();
fullDataSourceMap.put(MASTER_DATASOURCE_ALIAS, dataSource);
Map<String, DataSource>departmentDatasourceMap=getDepartmentDataSources(dataSource);
if (departmentDatasourceMap!=null&&departmentDatasourceMap.size() >0 ){
fullDataSourceMap.putAll(departmentDatasourceMap);
        }
shardingRuleConfig.setDefaultDataSourceName(MASTER_DATASOURCE_ALIAS);
for (Stringtable : shardingJdbcProperties.getTables()){
TableRuleConfigurationtableRuleConfiguration=getDepartmentTableRuleConfiguration(table);
if (tableRuleConfiguration!=null){
shardingRuleConfig.getTableRuleConfigs().add(tableRuleConfiguration);
shardingRuleConfig.getBindingTableGroups().add(table);
            }
        }
Propertiesproperties=newProperties();
properties.put("sql.show", sqlShow);
returnShardingDataSourceFactory.createDataSource(fullDataSourceMap, shardingRuleConfig, properties);
    }
privateTableRuleConfigurationgetDepartmentTableRuleConfiguration (Stringtable) {
finalStringDS_TABLE_EXPRESSION="%s${0..%d}.%s";
if (StringUtils.isNotEmpty(table)){
Stringexpression=String.format(DS_TABLE_EXPRESSION, shardingSphereDataSourceSerialRecord.getDataSourcePrefix(),
shardingSphereDataSourceSerialRecord.getMaxSerialNum(), table);
returnnewTableRuleConfiguration(table, expression);
        }
returnnull;
    }
@Bean(name="apiSqlSessionFactory")
@PrimarypublicSqlSessionFactoryrdsSqlSessionFactory(@Qualifier("shardingSphereDataSource")DataSourcedynamicDataSource) throwsException {
SqlSessionFactoryBeansqlSessionFactory=newSqlSessionFactoryBean();
sqlSessionFactory.setDataSource(dynamicDataSource);
sqlSessionFactory.setConfigLocation(newClassPathResource("mybatis-config.xml"));
sqlSessionFactory.setMapperLocations(newPathMatchingResourcePatternResolver().getResources("classpath:mapper/**/*.xml"));
returnsqlSessionFactory.getObject();
    }
@Bean(name="apiTransactionManager")
@PrimarypublicDataSourceTransactionManagerrdsTransactionManager(@Qualifier("shardingSphereDataSource") DataSourcedynamicDataSource){
returnnewDataSourceTransactionManager(dynamicDataSource);
    }
@Bean(name="apiSqlSessionTemplate")
@PrimarypublicSqlSessionTemplaterdsSqlSessionTemplate(@Qualifier("apiSqlSessionFactory") SqlSessionFactoryrdsSqlSessionFactory){
returnnewSqlSessionTemplate(rdsSqlSessionFactory);
    }
@Bean(name="apiNamedParameterJdbcTemplate")
@PrimarypublicNamedParameterJdbcTemplaterdsNamedParameterJdbcTemplate(@Qualifier("shardingSphereDataSource") DataSourcedynamicDataSource){
returnnewNamedParameterJdbcTemplate(dynamicDataSource);
    }
publicstaticvoidcheckInitialize() throwsInterruptedException {
if (!initialized) {
lock.lock();
try{
condition.await(5000, TimeUnit.MILLISECONDS);
            }finally {
lock.unlock();
            }
        }
    }
publicstaticbooleancontainLogicTable(StringtableName){
try {
checkInitialize();
        } catch (InterruptedExceptione) {
        }
returnlogicTables==null?false : logicTables.contains(tableName);
    }
publicstaticSet<String>getLogicTables () {
try {
checkInitialize();
        } catch (InterruptedExceptione) {
        }
returnlogicTables==null?Collections.emptySet() : logicTables;
    }


 然后,封装一个类,对外提供切换数据源的接口

importlombok.extern.slf4j.Slf4j;
importorg.apache.commons.collections4.CollectionUtils;
importorg.apache.commons.lang3.ArrayUtils;
importorg.apache.commons.lang3.StringUtils;
importorg.apache.shardingsphere.api.hint.HintManager;
importjava.util.*;
@Slf4jpublicclassDynamicDataSourceContextHolder {
privatestaticfinalThreadLocal<String>CONTEXT_HOLDER=newThreadLocal<String>() {
/*** 将 master 数据源的 key作为默认数据源的 key*/@OverrideprotectedStringinitialValue() {
return"master";
        }
    };
/*** 数据源的 key集合,用于切换时判断数据源是否存在*/publicstaticList<Object>dataSourceKeys=newArrayList<>();
/*** 切换数据源* @param keys*/publicstaticvoidsetDataSourceKey(String ... keys) {
if (ArrayUtils.isNotEmpty(keys)){
log.info("调用数据源部门id为:"+Arrays.toString(keys));
Set<String>logicTables=ShardingJdbcConfig.getLogicTables();
HintManagerhintManager=HintManager.getInstance();
ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance();
if (CollectionUtils.isNotEmpty(logicTables)){
for (StringlogicTable : logicTables){
for (Stringkey : keys){
if (shardingSphereDataSourceSerialRecord.containDataSourceAliase(key)){
hintManager.addDatabaseShardingValue(logicTable, key);
                        }
                    }
                }
            }
CONTEXT_HOLDER.set(keys[0]);
        }else{
log.info("调用master数据源");
        }
    }
publicstaticvoidsetDataSourceKeyByDepartmentIds (StringtableName, String ... departmentIds){
if (ArrayUtils.isEmpty(departmentIds) ||StringUtils.isEmpty(tableName)){
return;
        }
HintManagerhintManager=HintManager.getInstance();
ShardingSphereDataSourceSerialRecordshardingSphereDataSourceSerialRecord=ShardingSphereDataSourceSerialRecord.getInstance();
for (StringdepartmentId : departmentIds){
if (!shardingSphereDataSourceSerialRecord.containDataSourceAliase(departmentId)){
continue;
            }
StringserialNum=shardingSphereDataSourceSerialRecord.getDataSourceSerialNum(departmentId);
hintManager.addDatabaseShardingValue(tableName, serialNum);
        }
    }
/*** 获取数据源* @return*/publicstaticStringgetDataSourceKey() {
returnCONTEXT_HOLDER.get();
    }
/*** 重置数据源*/publicstaticvoidclearDataSourceKey() {
HintManager.clear();
CONTEXT_HOLDER.remove();
    }
/*** 判断是否包含数据源* @param key 数据源key* @return*/publicstaticbooleancontainDataSourceKey(Stringkey) {
returndataSourceKeys.contains(key);
    }
/*** 添加数据源keys* @param keys* @return*/publicstaticbooleanaddDataSourceKeys(Collection<?extendsObject>keys) {
returndataSourceKeys.addAll(keys);
    }
}



      然后就可以通过DynamicDataSourceContextHolder.setDataSourceKey()切换数据源了,不同的数据源相同的表结果一条sql就可以搞定了

相关文章
|
SQL druid Java
【YashanDB知识库】YashanDB JDBC查询时抛出YAS-02094
【YashanDB知识库】YashanDB JDBC查询时抛出YAS-02094
|
SQL druid Java
【YashanDB知识库】YashanDB JDBC驱动查询时抛出io fail:Read timed out异常
【YashanDB知识库】YashanDB JDBC驱动查询时抛出io fail:Read timed out异常
|
SQL druid Java
【YashanDB知识库】YashanDB JDBC驱动查询时抛出io fail:Read timed out异常
【YashanDB知识库】YashanDB JDBC驱动查询时抛出io fail:Read timed out异常
|
SQL druid Java
【YashanDB知识库】YashanDB JDBC查询时抛出YAS-02094
【YashanDB知识库】YashanDB JDBC查询时抛出YAS-02094
|
Java 数据库连接 数据库
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
|
Java 数据库连接 数据库
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**简介:** 客户在使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,遇到 YAS-00101 错误,提示无法分配内存。该问题影响所有版本的 YashanDB,导致业务中断。原因是用户缺少对 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to &lt;username&gt;;` 以恢复正常运行。
|
Java 数据库连接 数据库
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**问题简介:** 客户使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,出现 YAS-00101 错误,提示无法分配 0 字节内存。该问题影响所有 YashanDB 版本,导致业务中断。原因是数据库用户缺少 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to &lt;username&gt;;`。
|
SQL Java 数据库连接
JDBC连接SQL Server2008 完成增加、删除、查询、修改等基本信息基本格式及示例代码
这篇文章提供了使用JDBC连接SQL Server 2008数据库进行增加、删除、查询和修改操作的基本步骤和示例代码。
|
SQL Java 关系型数据库
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
JDBC技术【SQL注入、JDBC批量添加数据、JDBC事务处理、其他查询方式】(三)-全面详解(学习总结---从入门到深化)
430 0
|
Java 关系型数据库 MySQL
②⑩② 【读写分离】Sharding - JDBC 实现 MySQL读写分离[SpringBoot框架]
②⑩② 【读写分离】Sharding - JDBC 实现 MySQL读写分离[SpringBoot框架]
341 0
下一篇
开通oss服务