ShardingSphere是Apache下开源,遵循Database Plus,允许您将任何数据库转换为分布式数据库系统。

引入依赖

ShardingSphere5.1.2+mybatisPlus+mysql

            <!-- MyBatis Plus-->
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
                <version>3.5.3.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.30</version>
            </dependency>
            <dependency>
                <groupId>org.apache.shardingsphere</groupId>
                <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
                <version>5.1.2</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
                <version>${spring.boot.version}</version>
            </dependency>
            <!-- yaml配置读取-->
            <dependency>
                <groupId>org.yaml</groupId>
                <artifactId>snakeyaml</artifactId>
                <version>1.33</version>
            </dependency>

修改配置

从一般数据库配置到ShardingSphere的配置

spring:
  #shardingjdbc主要配置 使用HikariDataSource连接池
  shardingsphere:
    # 是否启用sharding
    #enabled: true
    props:
      # 是否显示sql
      sql-show: true
    datasource:
      names: test #数据源名称,多个以逗号隔开
      test:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3366/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=false
        username: root
        password: 123123
    rules:
      sharding:
        tables:
          wssl_terminal_data_b: # 分表,逻辑表名
            # 节点表添加下初始的表,后续会在新增租户的时候新增表且刷新节点
            actual-data-nodes: iot.terminal_data
            table-strategy: # 配置分表策略
              complex: # 用于单分片键的标准分片场景
                sharding-columns: terminal_type,create_time
                sharding-algorithm-name: real-data-inline
        # 分片算法配置
        sharding-algorithms:
          real-data-inline: # 分片算法名称
            type: CLASS_BASED #自定义策略
            props:
              strategy: complex
              # 包名+类名
              algorithmClassName: com.qzsoft.common.config.shardingsphere.DataShardingAlgorithm

添加表名转换的类 com.qzsoft.common.config.shardingsphere.DataShardingAlgorithm

通过自定义的逻辑找到需要查询的表  表不存在是可以自动创建表

@Slf4j
@Component
public class DataShardingAlgorithm implements ComplexKeysShardingAlgorithm<String> {


    @Override
    public Collection<String> doSharding(Collection<String> collection, ComplexKeysShardingValue<String> complexKeysShardingValue) {

        log.info("当前存在的表有:"+collection);
        String logicTableName = complexKeysShardingValue.getLogicTableName();
        Map<String, Collection<String>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();
        Map<String, Range<String>> columnNameAndRangeValuesMap = complexKeysShardingValue.getColumnNameAndRangeValuesMap();
        log.info("传入的参数列表:"+columnNameAndShardingValuesMap);
        log.info("传入的参数列表:"+columnNameAndRangeValuesMap);
        //业务逻辑
        List terminalType = (List)columnNameAndShardingValuesMap.get("terminal_type");
        List createTime = (List)columnNameAndShardingValuesMap.get("create_time");
        Range<Date> rangeTime = (Range)columnNameAndRangeValuesMap.get("create_time");
        if (terminalType == null || createTime == null) {
            List<String> list = new ArrayList<String>(){{addAll(collection);}};
            if (terminalType != null) {
                list.removeIf(s -> !s.contains((String)terminalType.get(0)));
            }
            if (createTime != null) {
                list.removeIf(s -> !s.endsWith(getTimeStr((Date) createTime.get(0))));
            }
            if (rangeTime != null) {
                Set<String> timeStrList = getTimeStrList(rangeTime.lowerEndpoint(), rangeTime.upperEndpoint());
                Iterator<String> iterator = list.iterator();
                a:while (iterator.hasNext()) {
                    String next = iterator.next();
                    for (String time : timeStrList) {
                        if (next.endsWith(time)) {
                            continue a;
                        }
                    }
                    iterator.remove();
                }
            }
            log.info("当前使用的表名" + list);
            return list;
        }
        //此处根据获取到的值进行业务逻辑编写,如下仅做举例,如果没有传参则获取到的为null
        String type = (String) terminalType.get(0);
        Date date = (Date) createTime.get(0);
        // 拼接的tenantId,格式为 表名_{tenant_id}
        String newTableName = (logicTableName + "_" + type + "_" + getTimeStr(date)).toLowerCase();
        log.info("表名为:" + newTableName);
        Integer num = ShardingAlgorithmTool.getTable(newTableName);
        if (num < 1) {
            // 没有表需要新建
            ShardingAlgorithmTool.copyTable(logicTableName, newTableName);
        }
        if (!collection.contains(newTableName)) {
            // 动态新增节点
            log.info("动态添加");
            collection.add(newTableName);
        }
        log.info("当前使用的表名" + newTableName);
        return Collections.singletonList(newTableName);
    }

    public String getTimeStr(Date date){
        int year = DateUtil.year(date);
        int month = DateUtil.month(date) + 1;
        month = month < 6 ? 1 : 6;
        return year + "_" + month;
    }

    public Set<String> getTimeStrList(Date lowDate, Date upDate){
        Set<String> set = new HashSet<>();
        while (lowDate.before(upDate)) {
            set.add(getTimeStr(lowDate));
            lowDate = DateUtil.offsetMonth(lowDate, 6);
        }
        set.add(getTimeStr(upDate));
        return set;
    }


    @Override
    public String getType() {
        return null;
    }

    @Override
    public Properties getProps() {
        return null;
    }


    @Override
    public void init(Properties properties) {
        // 可以把节点缓存起来,每次从缓存里面去取
    }
}

ShardingAlgorithmTool 操作表

@Slf4j
public class ShardingAlgorithmTool {
    private static DataShardingMapper dataShardingMapper = BeanUtil.getBean(DataShardingMapper.class);

    public static Integer getTable(String table) {
        return JDBCUtils.getTableNumByTableName(table);
    }

    public static void copyTable(String originalTable, String newTable) {
        dataShardingMapper.copyTables(originalTable, newTable);
    }
}

DataShardingMapper 复制表

@Mapper
public interface DataShardingMapper extends BaseMapper {
    //@Select("select count(1) from information_schema.TABLES where TABLE_NAME = #{tableName} ")
    //Integer getTablesNumByName(@Param("tableName") String tableName);


    String checkTableExistsWithShow(@Param("table") String table);

    @Update("CREATE TABLE ${newTable} ( LIKE ${originalTable});")
    void copyTables(@Param("originalTable") String originalTable, @Param("newTable") String newTable);
}

部分语句不能在ShardingSphere中使用,折中的解决方法,如果有其他的方法可以一起讨论

package com.qzsoft.common.utils;

import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.yaml.snakeyaml.Yaml;

import java.io.FileReader;
import java.io.IOException;
import java.net.URL;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

@Slf4j
public class JDBCUtils {

    private static String url;
    private static String user;
    private static String password;
    private static String driver;

    /**
     * 文件的读取,只需要读取一次即可拿到这些值。使用静态代码块
     */
    static {
        //读取资源文件,获取值。
        try {
            Yaml pro = new Yaml();
            //获取src路径下的文件的方式--->ClassLoader 类加载器
            ClassLoader classLoader = JDBCUtils.class.getClassLoader();
            URL res = classLoader.getResource("application-common.yml");
            String path = res.getPath();
            JSONObject map = pro.loadAs(new FileReader(path), JSONObject.class);
            //获取数据,赋值
            JSONObject jsonObject = map.getJSONObject("spring").getJSONObject("shardingsphere").getJSONObject("datasource").getJSONObject("iot");
            url = jsonObject.getString("url");
            user = jsonObject.getString("username");
            password = jsonObject.getString("password");
            driver = "com.mysql.jdbc.Driver";
            //4. 注册驱动
            Class.forName(driver);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }


    public static Connection getConnection() throws SQLException {
        return DriverManager.getConnection(url, user, password);
    }


    public static void close(PreparedStatement preparedStatement, Connection connection) {
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public static int getTableNumByTableName(String tableName){
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = getConnection();
            preparedStatement = connection.prepareStatement("select count(1) as num from information_schema.TABLES where TABLE_NAME = '" + tableName + "'");
            final ResultSet resultSet = preparedStatement.executeQuery();
            if (resultSet != null) {
                while (resultSet.next()) {
                    return resultSet.getInt("num");
                }
            }
        } catch (SQLException e) {
            log.error("查询失败!");
            e.printStackTrace();
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        } finally {
            close(preparedStatement, connection);
        }
        return 0;
    }

    public static List<String> getTableNamesByTableName(String tableName){
        List<String> tableNames = new ArrayList<>();
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = getConnection();
            preparedStatement = connection.prepareStatement("SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_NAME like '" +tableName+ "%' and table_schema = 'test'");
            final ResultSet resultSet = preparedStatement.executeQuery();
            if (resultSet != null) {
                while (resultSet.next()) {
                    final String name = resultSet.getString("TABLE_NAME");
                    tableNames.add(name);
                }
            }
        } catch (SQLException e) {
            e.printStackTrace();
            if (connection != null) {
                try {
                    connection.rollback();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        } finally {
            close(preparedStatement, connection);
        }
        return tableNames;
    }
}

添加初始化表名索引的类  不加这个 操作表时不能使用已存在的表

主要目的是查询出和分表相同的表然后放入进索引中去

@Component
@Slf4j
@RequiredArgsConstructor
public class AutoConfigDateNodes implements ApplicationRunner {

    @Resource
    private Environment environment;

    @Resource
    private final DataSource dataSource;


    @Override
    public void run(ApplicationArguments args) {
        log.info("进行自动更新sharding中物理表配置");
        refreshShardingNodes((ShardingSphereDataSource) dataSource);

    }

    private void refreshShardingNodes(ShardingSphereDataSource ds) {

        // 获取context信息
        ContextManager contextManager = getContextManager(ds);
        Integer day = null;
        Collection<RuleConfiguration> addRuleConfigs = new LinkedList<>();
        //获取配置的分片信息
        Collection<RuleConfiguration> configurations = contextManager.getMetaDataContexts().getMetaData().getDatabases()
                .get("logic_db").getRuleMetaData().getConfigurations();

        a:for (RuleConfiguration configuration : configurations) {
            //处理分片信息
            AlgorithmProvidedShardingRuleConfiguration algorithmProvidedShardingRuleConfiguration = (AlgorithmProvidedShardingRuleConfiguration) configuration;
            Map<String, ShardingAlgorithm> shardingAlgorithms = algorithmProvidedShardingRuleConfiguration
                    .getShardingAlgorithms();
            AlgorithmProvidedShardingRuleConfiguration addRuleConfiguration = new AlgorithmProvidedShardingRuleConfiguration();
            Collection<ShardingTableRuleConfiguration> addTableConfigurations = new LinkedList<>();
            //获取noed列表配置
            for (ShardingTableRuleConfiguration shardingTableRuleConfiguration : algorithmProvidedShardingRuleConfiguration
                    .getTables()) {
                //处理node信息,建表,自动刷新node都在此方法处理
                List<ShardingTableRuleConfiguration> tableRule = createTableRule(shardingTableRuleConfiguration, shardingAlgorithms);
                if (tableRule.isEmpty()) continue a;
                addTableConfigurations.addAll(tableRule);
            }
            addRuleConfiguration.setTables(addTableConfigurations);
            addRuleConfiguration.setAutoTables(algorithmProvidedShardingRuleConfiguration.getAutoTables());
            addRuleConfiguration
                    .setBindingTableGroups(algorithmProvidedShardingRuleConfiguration.getBindingTableGroups());
            addRuleConfiguration.setBroadcastTables(algorithmProvidedShardingRuleConfiguration.getBroadcastTables());
            addRuleConfiguration.setDefaultDatabaseShardingStrategy(
                    algorithmProvidedShardingRuleConfiguration.getDefaultDatabaseShardingStrategy());
            addRuleConfiguration.setDefaultTableShardingStrategy(
                    algorithmProvidedShardingRuleConfiguration.getDefaultTableShardingStrategy());
            addRuleConfiguration.setDefaultKeyGenerateStrategy(
                    algorithmProvidedShardingRuleConfiguration.getDefaultKeyGenerateStrategy());
            addRuleConfiguration
                    .setDefaultShardingColumn(algorithmProvidedShardingRuleConfiguration.getDefaultShardingColumn());
            addRuleConfiguration.setShardingAlgorithms(shardingAlgorithms);
            addRuleConfiguration.setKeyGenerators(algorithmProvidedShardingRuleConfiguration.getKeyGenerators());
            addRuleConfigs.add(addRuleConfiguration);
        }
        contextManager.alterRuleConfiguration("logic_db", addRuleConfigs);
        //将新数据添加进contex中
        setContextManager(ds, contextManager);
    }

    private ContextManager getContextManager(ShardingSphereDataSource dataSource) {
        try {
            Field contextManagerField = dataSource.getClass().getDeclaredField("contextManager");
            contextManagerField.setAccessible(true);
            return (ContextManager) contextManagerField.get(dataSource);
        }
        catch (Exception e) {
            throw new CustomException(100500, "系统异常");
        }

    }

    private void setContextManager(ShardingSphereDataSource dataSource, ContextManager manager) {
        try {
            Field contextManagerField = dataSource.getClass().getDeclaredField("contextManager");
            contextManagerField.setAccessible(true);
            contextManagerField.set(dataSource, manager);
        }
        catch (Exception e) {
            throw new CustomException(100500, "系统异常");
        }

    }

    private List<ShardingTableRuleConfiguration> createTableRule(
            ShardingTableRuleConfiguration shardingTableRuleConfiguration,
            Map<String, ShardingAlgorithm> shardingAlgorithms) {
        List<ShardingTableRuleConfiguration> list = new ArrayList<>();
        ShardingAlgorithm shardingAlgorithm = shardingAlgorithms
                .get(shardingTableRuleConfiguration.getTableShardingStrategy().getShardingAlgorithmName());
        // 获取当前分表使用的自定义类的全路径
        String algorithmClassName = shardingAlgorithm.getProps().getProperty("algorithmClassName");
        if (StringUtils.isBlank(algorithmClassName)) {
            return list;
        }
        // 通过反射执行‘分表自定义类’的构建物理表方法
        String logicTable = shardingTableRuleConfiguration.getLogicTable();
        //查询需要使用的表名
        List<String> tableNames = JDBCUtils.getTableNamesByTableName(logicTable);
        //必须要返回一个  不然对应的表不会走 sharding
        if (tableNames.size() != 1) {
            tableNames.remove(logicTable);
        }
        if (tableNames.isEmpty()) {
            return list;
        }
        System.out.println("使用的全部表名" + tableNames);
        String[] split = environment.getProperty("spring.shardingsphere.datasource.names").split("\\.");
        for (String DBName : split) {
            ShardingTableRuleConfiguration addTableConfiguration = new ShardingTableRuleConfiguration(logicTable,
                    DBName+"." + StringUtils.join(tableNames, "," + DBName+"."));
            addTableConfiguration.setTableShardingStrategy(shardingTableRuleConfiguration.getTableShardingStrategy());
            addTableConfiguration.setDatabaseShardingStrategy(shardingTableRuleConfiguration.getDatabaseShardingStrategy());
            addTableConfiguration.setKeyGenerateStrategy(shardingTableRuleConfiguration.getKeyGenerateStrategy());
            list.add(addTableConfiguration);

        }

        return list;

    }

}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐