SpringBoot+ShardingSphere5.1.2+mybatisPlus实现自动分库分表 自动创建表(分表条件有英文字符 无法通过**匹配)
ShardingSphere是Apache下开源,遵循Database Plus,允许您将任何数据库转换为分布式数据库系统。
文章共2,021字 · 阅读需要大约7分钟
一键AI生成摘要,助你高效阅读
问答
·
ShardingSphere是Apache下开源,遵循Database Plus,允许您将任何数据库转换为分布式数据库系统。
引入依赖
ShardingSphere5.1.2+mybatisPlus+mysql
<!-- MyBatis Plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.1.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<!-- yaml配置读取-->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.33</version>
</dependency>
修改配置
从一般数据库配置到ShardingSphere的配置
spring:
#shardingjdbc主要配置 使用HikariDataSource连接池
shardingsphere:
# 是否启用sharding
#enabled: true
props:
# 是否显示sql
sql-show: true
datasource:
names: test #数据源名称,多个以逗号隔开
test:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3366/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
username: root
password: 123123
rules:
sharding:
tables:
wssl_terminal_data_b: # 分表,逻辑表名
# 节点表添加下初始的表,后续会在新增租户的时候新增表且刷新节点
actual-data-nodes: iot.terminal_data
table-strategy: # 配置分表策略
complex: # 用于单分片键的标准分片场景
sharding-columns: terminal_type,create_time
sharding-algorithm-name: real-data-inline
# 分片算法配置
sharding-algorithms:
real-data-inline: # 分片算法名称
type: CLASS_BASED #自定义策略
props:
strategy: complex
# 包名+类名
algorithmClassName: com.qzsoft.common.config.shardingsphere.DataShardingAlgorithm
添加表名转换的类 com.qzsoft.common.config.shardingsphere.DataShardingAlgorithm
通过自定义的逻辑找到需要查询的表 表不存在是可以自动创建表
@Slf4j
@Component
public class DataShardingAlgorithm implements ComplexKeysShardingAlgorithm<String> {
@Override
public Collection<String> doSharding(Collection<String> collection, ComplexKeysShardingValue<String> complexKeysShardingValue) {
log.info("当前存在的表有:"+collection);
String logicTableName = complexKeysShardingValue.getLogicTableName();
Map<String, Collection<String>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();
Map<String, Range<String>> columnNameAndRangeValuesMap = complexKeysShardingValue.getColumnNameAndRangeValuesMap();
log.info("传入的参数列表:"+columnNameAndShardingValuesMap);
log.info("传入的参数列表:"+columnNameAndRangeValuesMap);
//业务逻辑
List terminalType = (List)columnNameAndShardingValuesMap.get("terminal_type");
List createTime = (List)columnNameAndShardingValuesMap.get("create_time");
Range<Date> rangeTime = (Range)columnNameAndRangeValuesMap.get("create_time");
if (terminalType == null || createTime == null) {
List<String> list = new ArrayList<String>(){{addAll(collection);}};
if (terminalType != null) {
list.removeIf(s -> !s.contains((String)terminalType.get(0)));
}
if (createTime != null) {
list.removeIf(s -> !s.endsWith(getTimeStr((Date) createTime.get(0))));
}
if (rangeTime != null) {
Set<String> timeStrList = getTimeStrList(rangeTime.lowerEndpoint(), rangeTime.upperEndpoint());
Iterator<String> iterator = list.iterator();
a:while (iterator.hasNext()) {
String next = iterator.next();
for (String time : timeStrList) {
if (next.endsWith(time)) {
continue a;
}
}
iterator.remove();
}
}
log.info("当前使用的表名" + list);
return list;
}
//此处根据获取到的值进行业务逻辑编写,如下仅做举例,如果没有传参则获取到的为null
String type = (String) terminalType.get(0);
Date date = (Date) createTime.get(0);
// 拼接的tenantId,格式为 表名_{tenant_id}
String newTableName = (logicTableName + "_" + type + "_" + getTimeStr(date)).toLowerCase();
log.info("表名为:" + newTableName);
Integer num = ShardingAlgorithmTool.getTable(newTableName);
if (num < 1) {
// 没有表需要新建
ShardingAlgorithmTool.copyTable(logicTableName, newTableName);
}
if (!collection.contains(newTableName)) {
// 动态新增节点
log.info("动态添加");
collection.add(newTableName);
}
log.info("当前使用的表名" + newTableName);
return Collections.singletonList(newTableName);
}
public String getTimeStr(Date date){
int year = DateUtil.year(date);
int month = DateUtil.month(date) + 1;
month = month < 6 ? 1 : 6;
return year + "_" + month;
}
public Set<String> getTimeStrList(Date lowDate, Date upDate){
Set<String> set = new HashSet<>();
while (lowDate.before(upDate)) {
set.add(getTimeStr(lowDate));
lowDate = DateUtil.offsetMonth(lowDate, 6);
}
set.add(getTimeStr(upDate));
return set;
}
@Override
public String getType() {
return null;
}
@Override
public Properties getProps() {
return null;
}
@Override
public void init(Properties properties) {
// 可以把节点缓存起来,每次从缓存里面去取
}
}
ShardingAlgorithmTool 操作表
@Slf4j
public class ShardingAlgorithmTool {
private static DataShardingMapper dataShardingMapper = BeanUtil.getBean(DataShardingMapper.class);
public static Integer getTable(String table) {
return JDBCUtils.getTableNumByTableName(table);
}
public static void copyTable(String originalTable, String newTable) {
dataShardingMapper.copyTables(originalTable, newTable);
}
}
DataShardingMapper 复制表
@Mapper
public interface DataShardingMapper extends BaseMapper {
//@Select("select count(1) from information_schema.TABLES where TABLE_NAME = #{tableName} ")
//Integer getTablesNumByName(@Param("tableName") String tableName);
String checkTableExistsWithShow(@Param("table") String table);
@Update("CREATE TABLE ${newTable} ( LIKE ${originalTable});")
void copyTables(@Param("originalTable") String originalTable, @Param("newTable") String newTable);
}
部分语句不能在ShardingSphere中使用,折中的解决方法,如果有其他的方法可以一起讨论
package com.qzsoft.common.utils;
import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.yaml.snakeyaml.Yaml;
import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@Slf4j
public class JDBCUtils {
private static String url;
private static String user;
private static String password;
private static String driver;
/**
* 文件的读取,只需要读取一次即可拿到这些值。使用静态代码块
*/
static {
//读取资源文件,获取值。
try {
Yaml pro = new Yaml();
//获取src路径下的文件的方式--->ClassLoader 类加载器
ClassLoader classLoader = JDBCUtils.class.getClassLoader();
URL res = classLoader.getResource("application-common.yml");
String path = res.getPath();
JSONObject map = pro.loadAs(new FileReader(path), JSONObject.class);
//获取数据,赋值
JSONObject jsonObject = map.getJSONObject("spring").getJSONObject("shardingsphere").getJSONObject("datasource").getJSONObject("iot");
url = jsonObject.getString("url");
user = jsonObject.getString("username");
password = jsonObject.getString("password");
driver = "com.mysql.jdbc.Driver";
//4. 注册驱动
Class.forName(driver);
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static Connection getConnection() throws SQLException {
return DriverManager.getConnection(url, user, password);
}
public static void close(PreparedStatement preparedStatement, Connection connection) {
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
public static int getTableNumByTableName(String tableName){
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = getConnection();
preparedStatement = connection.prepareStatement("select count(1) as num from information_schema.TABLES where TABLE_NAME = '" + tableName + "'");
final ResultSet resultSet = preparedStatement.executeQuery();
if (resultSet != null) {
while (resultSet.next()) {
return resultSet.getInt("num");
}
}
} catch (SQLException e) {
log.error("查询失败!");
e.printStackTrace();
if (connection != null) {
try {
connection.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
} finally {
close(preparedStatement, connection);
}
return 0;
}
public static List<String> getTableNamesByTableName(String tableName){
List<String> tableNames = new ArrayList<>();
Connection connection = null;
PreparedStatement preparedStatement = null;
try {
connection = getConnection();
preparedStatement = connection.prepareStatement("SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME like '" +tableName+ "%' and table_schema = 'test'");
final ResultSet resultSet = preparedStatement.executeQuery();
if (resultSet != null) {
while (resultSet.next()) {
final String name = resultSet.getString("TABLE_NAME");
tableNames.add(name);
}
}
} catch (SQLException e) {
e.printStackTrace();
if (connection != null) {
try {
connection.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
} finally {
close(preparedStatement, connection);
}
return tableNames;
}
}
添加初始化表名索引的类 不加这个 操作表时不能使用已存在的表
主要目的是查询出和分表相同的表然后放入进索引中去
@Component
@Slf4j
@RequiredArgsConstructor
public class AutoConfigDateNodes implements ApplicationRunner {
@Resource
private Environment environment;
@Resource
private final DataSource dataSource;
@Override
public void run(ApplicationArguments args) {
log.info("进行自动更新sharding中物理表配置");
refreshShardingNodes((ShardingSphereDataSource) dataSource);
}
private void refreshShardingNodes(ShardingSphereDataSource ds) {
// 获取context信息
ContextManager contextManager = getContextManager(ds);
Integer day = null;
Collection<RuleConfiguration> addRuleConfigs = new LinkedList<>();
//获取配置的分片信息
Collection<RuleConfiguration> configurations = contextManager.getMetaDataContexts().getMetaData().getDatabases()
.get("logic_db").getRuleMetaData().getConfigurations();
a:for (RuleConfiguration configuration : configurations) {
//处理分片信息
AlgorithmProvidedShardingRuleConfiguration algorithmProvidedShardingRuleConfiguration = (AlgorithmProvidedShardingRuleConfiguration) configuration;
Map<String, ShardingAlgorithm> shardingAlgorithms = algorithmProvidedShardingRuleConfiguration
.getShardingAlgorithms();
AlgorithmProvidedShardingRuleConfiguration addRuleConfiguration = new AlgorithmProvidedShardingRuleConfiguration();
Collection<ShardingTableRuleConfiguration> addTableConfigurations = new LinkedList<>();
//获取noed列表配置
for (ShardingTableRuleConfiguration shardingTableRuleConfiguration : algorithmProvidedShardingRuleConfiguration
.getTables()) {
//处理node信息,建表,自动刷新node都在此方法处理
List<ShardingTableRuleConfiguration> tableRule = createTableRule(shardingTableRuleConfiguration, shardingAlgorithms);
if (tableRule.isEmpty()) continue a;
addTableConfigurations.addAll(tableRule);
}
addRuleConfiguration.setTables(addTableConfigurations);
addRuleConfiguration.setAutoTables(algorithmProvidedShardingRuleConfiguration.getAutoTables());
addRuleConfiguration
.setBindingTableGroups(algorithmProvidedShardingRuleConfiguration.getBindingTableGroups());
addRuleConfiguration.setBroadcastTables(algorithmProvidedShardingRuleConfiguration.getBroadcastTables());
addRuleConfiguration.setDefaultDatabaseShardingStrategy(
algorithmProvidedShardingRuleConfiguration.getDefaultDatabaseShardingStrategy());
addRuleConfiguration.setDefaultTableShardingStrategy(
algorithmProvidedShardingRuleConfiguration.getDefaultTableShardingStrategy());
addRuleConfiguration.setDefaultKeyGenerateStrategy(
algorithmProvidedShardingRuleConfiguration.getDefaultKeyGenerateStrategy());
addRuleConfiguration
.setDefaultShardingColumn(algorithmProvidedShardingRuleConfiguration.getDefaultShardingColumn());
addRuleConfiguration.setShardingAlgorithms(shardingAlgorithms);
addRuleConfiguration.setKeyGenerators(algorithmProvidedShardingRuleConfiguration.getKeyGenerators());
addRuleConfigs.add(addRuleConfiguration);
}
contextManager.alterRuleConfiguration("logic_db", addRuleConfigs);
//将新数据添加进contex中
setContextManager(ds, contextManager);
}
private ContextManager getContextManager(ShardingSphereDataSource dataSource) {
try {
Field contextManagerField = dataSource.getClass().getDeclaredField("contextManager");
contextManagerField.setAccessible(true);
return (ContextManager) contextManagerField.get(dataSource);
}
catch (Exception e) {
throw new CustomException(100500, "系统异常");
}
}
private void setContextManager(ShardingSphereDataSource dataSource, ContextManager manager) {
try {
Field contextManagerField = dataSource.getClass().getDeclaredField("contextManager");
contextManagerField.setAccessible(true);
contextManagerField.set(dataSource, manager);
}
catch (Exception e) {
throw new CustomException(100500, "系统异常");
}
}
private List<ShardingTableRuleConfiguration> createTableRule(
ShardingTableRuleConfiguration shardingTableRuleConfiguration,
Map<String, ShardingAlgorithm> shardingAlgorithms) {
List<ShardingTableRuleConfiguration> list = new ArrayList<>();
ShardingAlgorithm shardingAlgorithm = shardingAlgorithms
.get(shardingTableRuleConfiguration.getTableShardingStrategy().getShardingAlgorithmName());
// 获取当前分表使用的自定义类的全路径
String algorithmClassName = shardingAlgorithm.getProps().getProperty("algorithmClassName");
if (StringUtils.isBlank(algorithmClassName)) {
return list;
}
// 通过反射执行‘分表自定义类’的构建物理表方法
String logicTable = shardingTableRuleConfiguration.getLogicTable();
//查询需要使用的表名
List<String> tableNames = JDBCUtils.getTableNamesByTableName(logicTable);
//必须要返回一个 不然对应的表不会走 sharding
if (tableNames.size() != 1) {
tableNames.remove(logicTable);
}
if (tableNames.isEmpty()) {
return list;
}
System.out.println("使用的全部表名" + tableNames);
String[] split = environment.getProperty("spring.shardingsphere.datasource.names").split("\\.");
for (String DBName : split) {
ShardingTableRuleConfiguration addTableConfiguration = new ShardingTableRuleConfiguration(logicTable,
DBName+"." + StringUtils.join(tableNames, "," + DBName+"."));
addTableConfiguration.setTableShardingStrategy(shardingTableRuleConfiguration.getTableShardingStrategy());
addTableConfiguration.setDatabaseShardingStrategy(shardingTableRuleConfiguration.getDatabaseShardingStrategy());
addTableConfiguration.setKeyGenerateStrategy(shardingTableRuleConfiguration.getKeyGenerateStrategy());
list.add(addTableConfiguration);
}
return list;
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)