讲一下思路,就不贴代码了,完整代码都在https://github.com/kyrotiko/switch-datasource
第一步:动态切换数据源

思路

mybatis 执行大致流程

接口调用 >>> SqlSessionTemplate >>> 获取sqlsessionfactory >>> datasource >>> 执行

可以在sqlsessionTemplate处做路由,实现一个自定义CustomSqlSessionTemplate继承SqlSessionTemplate重写getSqlSessionFactory,这样子我们可以创建多个sqlsessionfactory每个sessionfactory包含自己的DataSource,将多个sqlsessionfactory注入到CustomSqlSessionTemplate中,使用一个Map来存储以<数据源名称,sqlsessionfactory>,然后对于mybatis来说仍然只有一个SqlSessionTemplate,当有sql需要执行的时候,mybatis会调用SqlSessionTemplate的getSqlSessionFactory方法来获取当前sessionfactory,由于我们是自己实现的CustomSqlSessionTemplate,可以重写getSqlSessionFactory来动态选择sessionfactory,最后执行sql自然也是到各自的DataSource中去执行。

对于如何做动态选择,可以使用一个ThreadLocal,在service代码中在执行sql前设置ThreadLocal为要操作的数据源名称,然后再getSqlSessionFactory方法中获取ThreadLocal的值来从Map<数据源名称,sqlsessionfactory>中选择sqlsessionfactory,原理如此,但实际开发中不会这么麻烦,可以用aop来绑定当前用户所使用的的DataSource

mybatis配置代码如下:

package com.jinshang.datasource.config.datasource;

import com.jinshang.datasource.config.datasource.utils.CustomSqlSessionTemplate;
import com.atomikos.jdbc.AtomikosDataSourceBean;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.mybatis.spring.boot.autoconfigure.SpringBootVFS;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @author yuanyang(417168602@qq.com)
 * @date 2019/4/3 10:42
 */

@Configuration
@Component
@MapperScan(basePackages = MybatisConfig.BASE_PACKAGE, sqlSessionTemplateRef = "sqlSessionTemplate")
public class MybatisConfig extends AbstractDataSourceConfig {

    //mapper模式下的接口层
    static final String BASE_PACKAGE = "com.jinshang.datasource.mapper";
    //对接数据库的实体层
    static final String ALIASES_PACKAGE = "com.jinshang.datasource.domain";
    static final String MAPPER_LOCATION = "classpath:mapper/*.xml";

    //创建新数据源时的配置信息
    @Value("${spring.datasource.druid.template.url}")
    private String url;
    @Value("${spring.datasource.druid.template.username}")
    private String username;
    @Value("${spring.datasource.druid.template.password}")
    private String password;
    @Value("${spring.datasource.druid.template.driver-class-name}")
    private String driverClassName;


    @Primary
    @Bean(name = "dataSourceSystem")
    public DataSource dataSourceOne(Environment env) {
        String prefix = "spring.datasource.druid.system.";
        return getDataSource(env, prefix, "system");
    }

    @Bean(name = "sqlSessionFactorySystem")
    public SqlSessionFactory sqlSessionFactoryOne(@Qualifier("dataSourceSystem") DataSource dataSource)
            throws Exception {
        return createSqlSessionFactory(dataSource);
    }


    @Bean(name = "sqlSessionTemplate")
    public CustomSqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactorySystem") SqlSessionFactory factorySystem) throws Exception {
        Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
        sqlSessionFactoryMap.put("system", factorySystem);
        CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factorySystem);
        customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
        return customSqlSessionTemplate;
    }

    /**
     * 创建数据源
     *
     * @param dataSource
     * @return
     */
    public SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setVfs(SpringBootVFS.class);
        bean.setTypeAliasesPackage(ALIASES_PACKAGE);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        bean.setConfiguration(configuration());
        return bean.getObject();
    }

    public org.apache.ibatis.session.Configuration configuration() {
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setMapUnderscoreToCamelCase(true);
        return configuration;
    }

    public DataSource getDataSource(String dataSourceName) {
        Properties prop = new Properties();
        prop.put("url", this.url.replace("{dbName}", dataSourceName));
        prop.put("username", this.username);
        prop.put("password", this.password);
        prop.put("driverClassName", this.driverClassName);
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName(dataSourceName);
        ds.setXaProperties(prop);
        return ds;
    }
}
CustomSqlSessionTemplate代码如下
package com.jinshang.datasource.config.datasource.utils;

import static java.lang.reflect.Proxy.newProxyInstance;
import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
import static org.mybatis.spring.SqlSessionUtils.getSqlSession;
import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional;


import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.List;
import java.util.Map;


import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.MyBatisExceptionTranslator;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.util.Assert;


/**
 * TODO: 增加描述
 *
 * @author Administrator
 * @version 0.1.0
 * @date 2015年9月22日 下午6:27:47
 * @copyright stnts.com
 */
public class CustomSqlSessionTemplate extends SqlSessionTemplate {

    private final SqlSessionFactory sqlSessionFactory;
    private final ExecutorType executorType;
    private final SqlSession sqlSessionProxy;
    private final PersistenceExceptionTranslator exceptionTranslator;

    private Map<Object, SqlSessionFactory> targetSqlSessionFactorys;
    private SqlSessionFactory defaultTargetSqlSessionFactory;

    public void setTargetSqlSessionFactorys(Map<Object, SqlSessionFactory> targetSqlSessionFactorys) {
        this.targetSqlSessionFactorys = targetSqlSessionFactorys;
    }

    public Map<Object, SqlSessionFactory> getTargetSqlSessionFactorys() {
        return targetSqlSessionFactorys;
    }

    public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
        this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
        this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
        this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
                .getEnvironment().getDataSource(), true));
    }

    public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
                                    PersistenceExceptionTranslator exceptionTranslator) {

        super(sqlSessionFactory, executorType, exceptionTranslator);

        this.sqlSessionFactory = sqlSessionFactory;
        this.executorType = executorType;
        this.exceptionTranslator = exceptionTranslator;

        this.sqlSessionProxy = (SqlSession) newProxyInstance(
                SqlSessionFactory.class.getClassLoader(),
                new Class[]{SqlSession.class},
                new SqlSessionInterceptor());

        this.defaultTargetSqlSessionFactory = sqlSessionFactory;
    }

    @Override
    public SqlSessionFactory getSqlSessionFactory() {
//        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(SessionUtils.getCurrentDataSource());
        SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(DataSourceContextHolder.getDatasourceType());
        if (targetSqlSessionFactory != null) {
            return targetSqlSessionFactory;
        } else if (defaultTargetSqlSessionFactory != null) {
            return defaultTargetSqlSessionFactory;
        } else {
            Assert.notNull(targetSqlSessionFactorys, "Property 'targetSqlSessionFactorys' or 'defaultTargetSqlSessionFactory' are required");
            Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactorys' are required");
        }
        return this.sqlSessionFactory;
    }

    @Override
    public Configuration getConfiguration() {
        return this.getSqlSessionFactory().getConfiguration();
    }

    public ExecutorType getExecutorType() {
        return this.executorType;
    }

    public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
        return this.exceptionTranslator;
    }

    /**
     * {@inheritDoc}
     */
    public <T> T selectOne(String statement) {
        return this.sqlSessionProxy.<T>selectOne(statement);
    }

    /**
     * {@inheritDoc}
     */
    public <T> T selectOne(String statement, Object parameter) {
        return this.sqlSessionProxy.<T>selectOne(statement, parameter);
    }

    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
        return this.sqlSessionProxy.<K, V>selectMap(statement, mapKey);
    }

    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
        return this.sqlSessionProxy.<K, V>selectMap(statement, parameter, mapKey);
    }

    /**
     * {@inheritDoc}
     */
    public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
        return this.sqlSessionProxy.<K, V>selectMap(statement, parameter, mapKey, rowBounds);
    }

    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement) {
        return this.sqlSessionProxy.<E>selectList(statement);
    }

    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement, Object parameter) {
        return this.sqlSessionProxy.<E>selectList(statement, parameter);
    }

    /**
     * {@inheritDoc}
     */
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
        return this.sqlSessionProxy.<E>selectList(statement, parameter, rowBounds);
    }

    /**
     * {@inheritDoc}
     */
    public void select(String statement, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, handler);
    }

    /**
     * {@inheritDoc}
     */
    public void select(String statement, Object parameter, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, handler);
    }

    /**
     * {@inheritDoc}
     */
    public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
        this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
    }

    /**
     * {@inheritDoc}
     */
    public int insert(String statement) {
        return this.sqlSessionProxy.insert(statement);
    }

    /**
     * {@inheritDoc}
     */
    public int insert(String statement, Object parameter) {
        return this.sqlSessionProxy.insert(statement, parameter);
    }

    /**
     * {@inheritDoc}
     */
    public int update(String statement) {
        return this.sqlSessionProxy.update(statement);
    }

    /**
     * {@inheritDoc}
     */
    public int update(String statement, Object parameter) {
        return this.sqlSessionProxy.update(statement, parameter);
    }

    /**
     * {@inheritDoc}
     */
    public int delete(String statement) {
        return this.sqlSessionProxy.delete(statement);
    }

    /**
     * {@inheritDoc}
     */
    public int delete(String statement, Object parameter) {
        return this.sqlSessionProxy.delete(statement, parameter);
    }

    /**
     * {@inheritDoc}
     */
    public <T> T getMapper(Class<T> type) {
        return getConfiguration().getMapper(type, this);
    }

    /**
     * {@inheritDoc}
     */
    public void commit() {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }

    /**
     * {@inheritDoc}
     */
    public void commit(boolean force) {
        throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
    }

    /**
     * {@inheritDoc}
     */
    public void rollback() {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }

    /**
     * {@inheritDoc}
     */
    public void rollback(boolean force) {
        throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
    }

    /**
     * {@inheritDoc}
     */
    public void close() {
        throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
    }

    /**
     * {@inheritDoc}
     */
    public void clearCache() {
        this.sqlSessionProxy.clearCache();
    }

    /**
     * {@inheritDoc}
     */
    public Connection getConnection() {
        return this.sqlSessionProxy.getConnection();
    }

    /**
     * {@inheritDoc}
     *
     * @since 1.0.2
     */
    public List<BatchResult> flushStatements() {
        return this.sqlSessionProxy.flushStatements();
    }

    /**
     * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
     * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
     * the {@code PersistenceExceptionTranslator}.
     */
    private class SqlSessionInterceptor implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            final SqlSession sqlSession = getSqlSession(
                    CustomSqlSessionTemplate.this.getSqlSessionFactory(),
                    CustomSqlSessionTemplate.this.executorType,
                    CustomSqlSessionTemplate.this.exceptionTranslator);
            try {
                Object result = method.invoke(sqlSession, args);
                if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {
                    // force commit even on non-dirty sessions because some databases require
                    // a commit/rollback before calling close()
                    sqlSession.commit(true);
                }
                return result;
            } catch (Throwable t) {
                Throwable unwrapped = unwrapThrowable(t);
                if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                    Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator
                            .translateExceptionIfPossible((PersistenceException) unwrapped);
                    if (translated != null) {
                        unwrapped = translated;
                    }
                }
                throw unwrapped;
            } finally {
                closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());
            }
        }
    }

}

第二步:分布式事务(夸库事务)

有多个数据源的情况下在同一方法中切换数据源会导致事务的回滚失败,只会默认回滚第一个数据源的事务

思路:可以使用atomikos来支持分布式事务,具体代码如下

protected DataSource getDataSource(Environment env, String prefix, String dataSourceName) {
    Properties prop = build(env, prefix);
    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
    ds.setUniqueResourceName(dataSourceName);
    ds.setXaProperties(prop);
    return ds;
}

手动创建DataSource,我猜测atomikos原理是代理了数据源,然后阻塞commit,总之有了atomikos就可以支持分布式事务了

第三步:动态添加数据源

思路:不多讲了,第一步创建DataSource,使用DataSource创建SqlSessionFactory,然后将此SqlSessionFactory注入到CustomSqlSessionTemplate的Map<数据源名称,sqlsessionfactory>中,需要注意的是,我们在操作CustomSqlSessionTemplate的sqlsessionfactory的Map时需要考虑线程安全问题,因此最好的做法是创建一个新的Map然后将老的数据导进去然后把新创建的数据源导进去,然后替换原始的map

    public SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        bean.setVfs(SpringBootVFS.class);
        bean.setTypeAliasesPackage(ALIASES_PACKAGE);
        bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATION));
        bean.setConfiguration(configuration());
        return bean.getObject();
    }

    public org.apache.ibatis.session.Configuration configuration() {
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setMapUnderscoreToCamelCase(true);
        return configuration;
    }

    public DataSource getDataSource(String dataSourceName) {
        Properties prop = new Properties();
        prop.put("url", this.url.replace("{dbName}", dataSourceName));
        prop.put("username", this.username);
        prop.put("password", this.password);
        prop.put("driverClassName", this.driverClassName);
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName(dataSourceName);
        ds.setXaProperties(prop);
        return ds;
    }

    /**
     * 初始或者重载入数据源
     *
     * @throws Exception
     */
    @PostConstruct
    public void loadDataSource() throws Exception {
        List<Account> accountList = accountService.queryAll();
        Map<Object, SqlSessionFactory> newSqlSessionFactoryMap = new HashMap<>(16);
        Map<Object, SqlSessionFactory> sqlSessionFactoryMap = sqlSessionTemplate.getTargetSqlSessionFactorys();
        for (Account account : accountList) {
            SqlSessionFactory sqlSessionFactory = mybatisConfig.createSqlSessionFactory(mybatisConfig.getDataSource(account.getDbName()));
            newSqlSessionFactoryMap.put(account.getDbName(), sqlSessionFactory);
        }
        newSqlSessionFactoryMap.putAll(sqlSessionFactoryMap);
        this.sqlSessionTemplate.setTargetSqlSessionFactorys(newSqlSessionFactoryMap);
    }

第四步:动态创建数据源

思路:由于我们使用了xa事务,在使用datasource的connection去执行建表建库语句会导致报错,我们只能通过原始的jdbc去创建connection执行建库语句,然后重新载入数据源!!!需要注意是但是我们必须得手动控制事务!!!有异常要往上抛

package com.jinshang.datasource.services.impl;

import com.jinshang.datasource.config.datasource.utils.DataSourceManager;
import com.jinshang.datasource.domain.Account;
import com.jinshang.datasource.mapper.AccountMapper;
import com.jinshang.datasource.services.AccountService;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.sql.*;
import java.util.List;

/**
 * @author yuanyang(417168602@qq.com)
 * @date 2019/4/3 13:12
 */
@Service
public class AccountServiceImpl implements AccountService {

    @Autowired
    private AccountMapper accountMapper;

    @Autowired
    private DataSourceManager dataSourceManager;

    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;

    @Value("${spring.datasource.druid.system.url}")
    private String url;
    @Value("${spring.datasource.druid.system.username}")
    private String username;
    @Value("${spring.datasource.druid.system.password}")
    private String password;
    @Value("${spring.datasource.druid.system.driver-class-name}")
    private String driverClassName;


    @Override
    public void insert(Account account) {
        accountMapper.insert(account);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void createNewDataSource(String dbName) throws Exception {
        Account account = new Account();
        account.setDbName(dbName);
        createDataBase(dbName);
        accountMapper.insert(account);
        dataSourceManager.loadDataSource();
    }


    private void createDataBase(String dbName) throws Exception {
//        accountMapper.createDatabase(dbName);
        Connection connection = null;
        Statement st = null;
        try {
            Class.forName(driverClassName);
            connection = DriverManager.getConnection(url, username, password);
            connection.setAutoCommit(false);
            st = connection.createStatement();
            st.execute("create database " + dbName);
            st.execute("use " + dbName);
            st.execute("SET NAMES utf8mb4;\n" +
                    "SET FOREIGN_KEY_CHECKS = 0;\n" +
                    "\n" +
                    "DROP TABLE IF EXISTS `user`;\n" +
                    "CREATE TABLE `user`  (\n" +
                    "  `id` int(11) NOT NULL AUTO_INCREMENT,\n" +
                    "  `name` varchar(255) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL,\n" +
                    "  PRIMARY KEY (`id`) USING BTREE\n" +
                    ") ENGINE = InnoDB AUTO_INCREMENT = 32 CHARACTER SET = latin1 COLLATE = latin1_swedish_ci ROW_FORMAT = Dynamic;\n" +
                    "\n" +
                    "SET FOREIGN_KEY_CHECKS = 1;\n");
            connection.commit();
        } catch (Exception e) {
            connection.rollback();
            throw e;
        } finally {
            if (st != null) {
                st.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    @Override
    public List<Account> queryAll() {
        return accountMapper.select();
    }
}

完整代码都在https://github.com/kyrotiko/switch-datasource

 

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐