sharding-jdbc分庫分表


  對於分片數據庫的主鍵一般通過自己生產主鍵避免用數據庫自帶的自增主鍵。比如用redis生產自增主鍵、mysql用一個IdManager生產自增主鍵(每次從表中取一個主鍵)。

1.簡介

1.1分片鍵

  用於分片的數據庫字段,是將數據庫(表)水平拆分的關鍵字段。例:將訂單表中的訂單主鍵的尾數取模分片,則訂單主鍵為分片字段。 SQL中如果無分片字段,將執行全路由,性能較差。 除了對單分片字段的支持,ShardingSphere也支持根據多個字段進行分片。

1.2 分片算法

通過分片算法將數據分片,支持通過=、>=、<=、>、<、BETWEEN和IN分片。分片算法需要應用方開發者自行實現,可實現的靈活度非常高。

目前提供4種分片算法

(1)精確分片算法

  對應PreciseShardingAlgorithm,用於處理使用單一鍵作為分片鍵的=與IN進行分片的場景。需要配合StandardShardingStrategy使用。

(2)范圍分片算法

  對應RangeShardingAlgorithm,用於處理使用單一鍵作為分片鍵的BETWEEN AND、>、<、>=、<=進行分片的場景。需要配合StandardShardingStrategy使用。

(3)復合分片算法

  對應ComplexKeysShardingAlgorithm,用於處理使用多鍵作為分片鍵進行分片的場景,包含多個分片鍵的邏輯較復雜,需要應用開發者自行處理其中的復雜度。需要配合ComplexShardingStrategy使用

(4)Hint分片算法

  對應HintShardingAlgorithm,用於處理使用Hint行分片的場景。需要配合HintShardingStrategy使用。

 

1.3 分片策略

包含分片鍵和分片算法,由於分片算法的獨立性,將其獨立抽離。真正可用於分片操作的是分片鍵 + 分片算法,也就是分片策略。目前提供5種分片策略。

(1)標准分片策略

  對應StandardShardingStrategy。提供對SQL語句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。StandardShardingStrategy只支持單分片鍵,提供PreciseShardingAlgorithm和RangeShardingAlgorithm兩個分片算法。PreciseShardingAlgorithm是必選的,用於處理=和IN的分片。RangeShardingAlgorithm是可選的,用於處理BETWEEN AND, >, <, >=, <=分片,如果不配置RangeShardingAlgorithm,SQL中的BETWEEN AND將按照全庫路由處理。

(2)復合分片策略

  對應ComplexShardingStrategy。復合分片策略。提供對SQL語句中的=, >, <, >=, <=, IN和BETWEEN AND的分片操作支持。ComplexShardingStrategy支持多分片鍵,由於多分片鍵之間的關系復雜,因此並未進行過多的封裝,而是直接將分片鍵值組合以及分片操作符透傳至分片算法,完全由應用開發者實現,提供最大的靈活度。

(3)行表達式分片策略

  對應InlineShardingStrategy。使用Groovy的表達式,提供對SQL語句中的=和IN的分片操作支持,只支持單分片鍵。對於簡單的分片算法,可以通過簡單的配置使用,從而避免繁瑣的Java代碼開發,如: t_user_$->{u_id % 8} 表示t_user表根據u_id模8,而分成8張表,表名稱為t_user_0到t_user_7。

(4)Hint分片策略

  對應HintShardingStrategy。通過Hint指定分片值而非從SQL中提取分片值的方式進行分片的策略。

(5)不分片策略

  對應NoneShardingStrategy。不分片的策略。

 

2. SQL建庫建表語句如下:

DROP SCHEMA IF EXISTS demo_ds_0;
DROP SCHEMA IF EXISTS demo_ds_1;

CREATE SCHEMA IF NOT EXISTS demo_ds_0;
CREATE SCHEMA IF NOT EXISTS demo_ds_1;


CREATE TABLE IF NOT EXISTS demo_ds_0.t_order_0 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
CREATE TABLE IF NOT EXISTS demo_ds_0.t_order_1 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_0 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));
CREATE TABLE IF NOT EXISTS demo_ds_1.t_order_1 (order_id BIGINT NOT NULL, user_id INT NOT NULL, STATUS VARCHAR(50), PRIMARY KEY (order_id));

 

3. 不使用Spring的方式

1.pom配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>cn.qlq</groupId>
    <artifactId>shared</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.shardingsphere/sharding-jdbc-core -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-core</artifactId>
            <version>4.0.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>

        <!-- commons工具包 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

    </dependencies>

    <build>
        <!-- 配置了很多插件 -->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <!-- spring熱部署 -->
                <!-- <dependencies> <dependency> <groupId>org.springframework</groupId> 
                    <artifactId>springloaded</artifactId> <version>1.2.6.RELEASE</version> </dependency> 
                    </dependencies> -->
            </plugin>
        </plugins>
    </build>
</project>

 

2. 基於java的配置(使用行表達式分片策略簡單測試)

package shared;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomUtils;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.InlineShardingStrategyConfiguration;
import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;

public class DataSourceUtils {
public static DataSource getDataSource() throws SQLException {
        // 配置真實數據源
        Map<String, DataSource> dataSourceMap = new HashMap<>();

        // 配置第一個數據源
        BasicDataSource dataSource1 = new BasicDataSource();
        dataSource1.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource1.setUrl("jdbc:mysql://localhost:3306/demo_ds_0");
        dataSource1.setUsername("root");
        dataSource1.setPassword("123456");
        dataSourceMap.put("demo_ds_0", dataSource1);

        // 配置第二個數據源
        BasicDataSource dataSource2 = new BasicDataSource();
        dataSource2.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource2.setUrl("jdbc:mysql://localhost:3306/demo_ds_1");
        dataSource2.setUsername("root");
        dataSource2.setPassword("123456");
        dataSourceMap.put("demo_ds_1", dataSource2);

        // 配置Order表規則
        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                "demo_ds_${0..1}.t_order_${0..1}");

        // 配置分庫 + 分表策略
        orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                new InlineShardingStrategyConfiguration("user_id", "demo_ds_${user_id % 2}"));
        orderTableRuleConfig.setTableShardingStrategyConfig(
                new InlineShardingStrategyConfiguration("order_id", "t_order_${order_id % 2}"));

        // 配置分片規則
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);

        // 省略配置order_item表規則...
        // ...

        // 獲取數據源對象
        DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                new Properties());
        return dataSource;
    }
}

上面規則:

(1)根據user_id進行分庫。偶數在demo_ds_0;奇數在demo_ds_1

(2)根據order_id進行分表。偶數在t_order_0;奇數在t_order_1

 

3.測試

1.測試增加。(注意:ID要生成,不能用mysql的自增ID。會導致數據重復)。這里可以用基於redis生成唯一ID或者用IdManager從數據庫生成ID。

    private static void testAdd() throws SQLException {
        Connection conn = getDataSource().getConnection();
        for (int i = 1; i < 21; i++) {
            int user_id = RandomUtils.nextInt(1, 60);
            String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'狀態" + i
                    + "'" + ") ";
            System.out.println(sql);
            PreparedStatement preparedStatement = conn.prepareStatement(sql);
            boolean execute = preparedStatement.execute();
        }
    }

結果:

(1)查詢demo_ds_0.t_order_0

SELECT * FROM demo_ds_0.t_order_0

 

 (2)查詢demo_ds_0.t_order_1 

SELECT * FROM demo_ds_0.t_order_1

 (3)查詢demo_ds_1.t_order_0

SELECT * FROM demo_ds_1.t_order_0

 

 (4)查詢demo_ds_1.t_order_1

SELECT * FROM demo_ds_1.t_order_1

2.測試查詢

    private static void testselect() throws SQLException {
        Connection conn = getDataSource().getConnection();
        String sql = "select * from t_order";
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        ResultSet executeQuery = preparedStatement.executeQuery();
        while (executeQuery.next()) {
            int order_id = executeQuery.getInt("order_id");
            int user_id = executeQuery.getInt("user_id");
            String status = executeQuery.getString("status");
            System.out.print("order_id: " + order_id);
            System.out.print("\tuser_id: " + user_id);
            System.out.println("\tstatus: " + status);
        }
    }

結果:

order_id: 4user_id: 30status: 狀態4

order_id: 8user_id: 28status: 狀態8

order_id: 10user_id: 14status: 狀態10

order_id: 12user_id: 42status: 狀態12

order_id: 14user_id: 38status: 狀態14

order_id: 16user_id: 36status: 狀態16

order_id: 18user_id: 30status: 狀態18

order_id: 20user_id: 4status: 狀態20

order_id: 7user_id: 34status: 狀態7

order_id: 9user_id: 40status: 狀態9

order_id: 11user_id: 44status: 狀態11

order_id: 13user_id: 22status: 狀態13

order_id: 15user_id: 12status: 狀態15

order_id: 2user_id: 57status: 狀態2

order_id: 6user_id: 11status: 狀態6

order_id: 1user_id: 53status: 狀態1

order_id: 3user_id: 15status: 狀態3

order_id: 5user_id: 13status: 狀態5

order_id: 17user_id: 7status: 狀態17

order_id: 19user_id: 31status: 狀態19

注意:

(1)上面的分片算法不支持范圍查詢,比如:

select * from t_order where order_id < 15

報錯如下:

Exception in thread "main" java.lang.IllegalStateException: Inline strategy cannot support range sharding.
    at com.google.common.base.Preconditions.checkState(Preconditions.java:173)

 

(2)支持精確查詢,比如:

select * from t_order where order_id = 15

結果:

order_id: 15user_id: 12status: 狀態15

 

3. 使自定義分片策略

  自定義分庫算法,這里實現一個最簡單的分庫算法。

規則同上:

1. 根據user_id進行分庫。偶數在demo_ds_0;奇數在demo_ds_1

2. 據order_id進行分表。偶數在t_order_0;奇數在t_order_1

 

代碼如下: 分庫算法與分表算法都實現 ComplexKeysShardingAlgorithm 接口。

分庫算法:

package shared;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;

public class MyDBComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames,
            ComplexKeysShardingValue<Comparable<?>> shardingValue) {

        System.out.println("=====MyDBComplexShardingStrategy=====");
        System.out.println(availableTargetNames);
        System.out.println(shardingValue);

        if (CollectionUtils.isEmpty(availableTargetNames)) {
            throw new RuntimeException("可用數據庫為空");
        }

        List<String> result = new ArrayList<>();

        Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                .getColumnNameAndShardingValuesMap();
        Set<String> keySet = columnNameAndShardingValuesMap.keySet();
        for (String key : keySet) {
            if (!"user_id".equals(key)) {
                continue;
            }

            Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
            Iterator<Comparable<?>> iterator = collection.iterator();
            while (iterator.hasNext()) {
                Integer next = (Integer) iterator.next();
                Integer index = next % 2;
                String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                result.add(availableTargetName);
            }
        }

        System.out.println(result);

        return result;
    }

}

doSharding方法解釋:

 入參:

availableTargetNames:  可用的數據庫集合。 [demo_ds_0, demo_ds_1]

 shardingValue: 是指分庫的字段以及值集合。 ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={user_id=[25]}, columnNameAndRangeValuesMap={})

 返回值:

滿足條件的數據庫。返回集合表示同時向多個庫中插入。

 

分表算法:

package shared;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;

public class MyTableComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames,
            ComplexKeysShardingValue<Comparable<?>> shardingValue) {

        System.out.println("=====MyTableComplexShardingStrategy=====");
        System.out.println(availableTargetNames);
        System.out.println(shardingValue);

        if (CollectionUtils.isEmpty(availableTargetNames)) {
            throw new RuntimeException("可用數據表為空");
        }

        List<String> result = new ArrayList<>();

        Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                .getColumnNameAndShardingValuesMap();
        Set<String> keySet = columnNameAndShardingValuesMap.keySet();
        for (String key : keySet) {
            if (!"order_id".equals(key)) {
                continue;
            }

            Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
            Iterator<Comparable<?>> iterator = collection.iterator();
            while (iterator.hasNext()) {
                Integer next = (Integer) iterator.next();
                Integer index = next % 2;
                String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                result.add(availableTargetName);
            }
        }

        System.out.println(result);

        return result;
    }

}

doSharding方法解釋:

 入參:

availableTargetNames:  可用的數據表集合。 [t_order_0, t_order_1]

 shardingValue: 是指分表的字段以及值集合。 ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={order_id=[1]}, columnNameAndRangeValuesMap={})

 返回值:

  滿足條件的數據表。返回集合表示同時向多個表中插入。

 

獲取數據源代碼以及測試代碼:

package shared;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomUtils;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;

public class DataSourceUtils {

    public static void main(String[] args) throws SQLException {
        testAdd();
    }

    private static void testAdd() throws SQLException {
        Connection conn = getDataSource().getConnection();
        for (int i = 1; i < 21; i++) {
            int user_id = RandomUtils.nextInt(1, 60);
            String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'狀態" + i
                    + "'" + ") ";
            System.out.println(sql);
            PreparedStatement preparedStatement = conn.prepareStatement(sql);
            boolean execute = preparedStatement.execute();
        }
    }

    public static DataSource getDataSource() throws SQLException {
        // 配置真實數據源
        Map<String, DataSource> dataSourceMap = new HashMap<>();

        // 配置第一個數據源
        BasicDataSource dataSource1 = new BasicDataSource();
        dataSource1.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource1.setUrl("jdbc:mysql://localhost:3306/demo_ds_0");
        dataSource1.setUsername("root");
        dataSource1.setPassword("123456");
        dataSourceMap.put("demo_ds_0", dataSource1);

        // 配置第二個數據源
        BasicDataSource dataSource2 = new BasicDataSource();
        dataSource2.setDriverClassName("com.mysql.jdbc.Driver");
        dataSource2.setUrl("jdbc:mysql://localhost:3306/demo_ds_1");
        dataSource2.setUsername("root");
        dataSource2.setPassword("123456");
        dataSourceMap.put("demo_ds_1", dataSource2);

        // 配置Order表規則(t_order 是邏輯表名,
        // demo_ds_${0..1}.t_order_${0..1}是實際的節點數(groovy表達式))
        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                "demo_ds_${0..1}.t_order_${0..1}");

        // 配置分庫 + 分表策略
        orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
        orderTableRuleConfig.setTableShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));

        // 配置分片規則
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);

        // 獲取數據源對象
        DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                new Properties());
        return dataSource;
    }
}

解釋:

new TableRuleConfiguration("t_order",  ${0..1}.t_order_${0..1}")

   t_order 是邏輯表名、demo_ds_${0..1}.t_order_${0..1} 是實際的節點數(一個groovy范圍表達式)。而且對應的數據庫以及數據表必須存在。會進行驗證。

 

orderTableRuleConfig.setDatabaseShardingStrategyConfig(new ComplexShardingStrategyConfiguration("user_id, order_id", new MyDBComplexShardingStrategy()));

  如上面代碼可以設置多個分庫或者分表的字段。多個用逗號分隔即可。后面doSharding 獲取到的就是map中的兩對值,如下:

ComplexKeysShardingValue(logicTableName=t_order, columnNameAndShardingValuesMap={user_id=[45], order_id=[1]}, columnNameAndRangeValuesMap={})

 

4. 結合SpringBoot在spring中使用

1.pom中引入

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
        </dependency>
        <!-- sharding.jdbc for spring boot -->
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
            <version>4.0.0</version>
        </dependency>

 

2. application.properties中配置

###############S shardingsphere setting####
spring.shardingsphere.datasource.names=demo_ds_0,demo_ds_1

spring.shardingsphere.datasource.demo_ds_0.type=org.apache.commons.dbcp2.BasicDataSource
spring.shardingsphere.datasource.demo_ds_0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.demo_ds_0.url=jdbc:mysql://localhost:3306/demo_ds_0
spring.shardingsphere.datasource.demo_ds_0.username=root
spring.shardingsphere.datasource.demo_ds_0.password=123456

spring.shardingsphere.datasource.demo_ds_1.type=org.apache.commons.dbcp2.BasicDataSource
spring.shardingsphere.datasource.demo_ds_1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.demo_ds_1.url=jdbc:mysql://localhost:3306/demo_ds_1
spring.shardingsphere.datasource.demo_ds_1.username=root
spring.shardingsphere.datasource.demo_ds_1.password=123456

spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expression=demo_ds_$->{user_id % 2}

spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=demo_ds_$->{0..1}.t_order_$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression=t_order_$->{order_id % 2}
###############E shardingsphere setting####

 

3.Junit 測試

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.apache.commons.lang3.RandomUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import cn.qlq.MySpringBootApplication;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MySpringBootApplication.class)
public class PlainTest {

    @Resource
    private DataSource dataSource;

    @Test
    public void testAdd() throws SQLException {
        Connection conn = dataSource.getConnection();
        for (int i = 1; i < 21; i++) {
            int user_id = RandomUtils.nextInt(1, 60);
            String sql = "insert into t_order(order_id,user_id, status) values(" + i + "," + user_id + "," + "'狀態" + i
                    + "'" + ") ";
            System.out.println(sql);
            PreparedStatement preparedStatement = conn.prepareStatement(sql);
            boolean execute = preparedStatement.execute();
        }
    }
}

結果同上面不使用springboot結果一樣。

補充:shardingjdbc可以設置默認的數據源,也就是不想分庫分表的使用默認的數據源。

例如:

  shardingRuleConfig.setDefaultDataSourceName("defaultDataSource") 是設置默認的數據源,不分片的表都采用默認的數據源。

package cn.qlq.sharedjdbc.config;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DataSourceConfig {

    @Bean
    public static DataSource dataSource() throws SQLException {
        // 配置真實數據源
        Map<String, DataSource> dataSourceMap = new HashMap<>();

        // 配置第一個數據源
        BasicDataSource dataSource0 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_0", "root", "123456");
        dataSourceMap.put("demo_ds_0", dataSource0);

        // 配置第二個數據源
        BasicDataSource dataSource1 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_1", "root", "123456");
        dataSourceMap.put("demo_ds_1", dataSource1);

        // 配置默認數據源
        BasicDataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/test1", "root", "123456");
        dataSourceMap.put("defaultDataSource", defaultDataSource);

        // 配置Order表規則(t_order 是邏輯表名,
        // demo_ds_${0..1}.t_order_${0..1}是實際的節點數(groovy表達式))
        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                "demo_ds_${0..1}.t_order_${0..1}");
        // 配置分庫 + 分表策略
        orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
        orderTableRuleConfig.setTableShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));

        // 配置分片規則
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        // shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
        // 設置默認數據源
        shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");
        // 設置不分片的表
        // shardingRuleConfig.setBindingTableGroups(Arrays.asList("country"));

        // 獲取數據源對象
        DataSource createDataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                new Properties());
        return createDataSource;
    }

    private static BasicDataSource createDataSource(String url, String username, String password) {
        BasicDataSource defaultDataSource = new BasicDataSource();
        defaultDataSource.setDriverClassName("com.mysql.jdbc.Driver");
        defaultDataSource.setUrl(url);
        defaultDataSource.setUsername(username);
        defaultDataSource.setPassword(password);
        return defaultDataSource;
    }
}

5. mybatis使用

1.到demo_ds_0和demo_ds_1庫中創建表

CREATE TABLE t_order_0 (id BIGINT(20) NOT NULL, description VARCHAR(255), order_id BIGINT(20), user_id BIGINT(20), PRIMARY KEY (id)); 
CREATE TABLE t_order_1 (id BIGINT(20) NOT NULL, description VARCHAR(255), order_id BIGINT(20), user_id BIGINT(20), PRIMARY KEY (id));  

對應的bean如下:

package cn.qlq.sharedjdbc.client;

import javax.persistence.Id;

import com.baomidou.mybatisplus.annotation.TableName;

import lombok.Data;

@Data
// 對應邏輯表名稱
@TableName("t_order")
public class TOrder {

    @Id
    private Long id;

    private Long order_id;

    private Long user_id;

    private String description;

}

2. pom引入配置

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-dbcp2 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-dbcp2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
            <version>4.0.0</version>
        </dependency>

3.java配置設置分片規則以及默認數據源

package cn.qlq.sharedjdbc.config;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.shardingsphere.api.config.sharding.ShardingRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.TableRuleConfiguration;
import org.apache.shardingsphere.api.config.sharding.strategy.ComplexShardingStrategyConfiguration;
import org.apache.shardingsphere.shardingjdbc.api.ShardingDataSourceFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DataSourceConfig {

    @Bean
    public static DataSource dataSource() throws SQLException {
        // 配置真實數據源
        Map<String, DataSource> dataSourceMap = new HashMap<>();

        // 配置第一個數據源
        BasicDataSource dataSource0 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_0", "root", "123456");
        dataSourceMap.put("demo_ds_0", dataSource0);

        // 配置第二個數據源
        BasicDataSource dataSource1 = createDataSource("jdbc:mysql://localhost:3306/demo_ds_1", "root", "123456");
        dataSourceMap.put("demo_ds_1", dataSource1);

        // 配置默認數據源
        BasicDataSource defaultDataSource = createDataSource("jdbc:mysql://localhost:3306/test1", "root", "123456");
        dataSourceMap.put("defaultDataSource", defaultDataSource);

        // 配置Order表規則(t_order 是邏輯表名,
        // demo_ds_${0..1}.t_order_${0..1}是實際的節點數(groovy表達式))
        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration("t_order",
                "demo_ds_${0..1}.t_order_${0..1}");
        // 配置分庫 + 分表策略
        orderTableRuleConfig.setDatabaseShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("user_id", new MyDBComplexShardingStrategy()));
        orderTableRuleConfig.setTableShardingStrategyConfig(
                new ComplexShardingStrategyConfiguration("order_id", new MyTableComplexShardingStrategy()));

        // 配置分片規則
        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();
        shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);
        // 設置默認數據源
        shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");
        // 設置不分片的表
        shardingRuleConfig.setBindingTableGroups(Arrays.asList("country"));

        // 獲取數據源對象
        DataSource createDataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig,
                new Properties());
        return createDataSource;
    }

    private static BasicDataSource createDataSource(String url, String username, String password) {
        BasicDataSource defaultDataSource = new BasicDataSource();
        defaultDataSource.setDriverClassName("com.mysql.jdbc.Driver");
        defaultDataSource.setUrl(url);
        defaultDataSource.setUsername(username);
        defaultDataSource.setPassword(password);
        return defaultDataSource;
    }
}

 

分庫規則如下:

package cn.qlq.sharedjdbc.config;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;

public class MyDBComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames,
            ComplexKeysShardingValue<Comparable<?>> shardingValue) {

        System.out.println("=====MyDBComplexShardingStrategy=====");
        System.out.println(availableTargetNames);
        System.out.println(shardingValue);

        if (CollectionUtils.isEmpty(availableTargetNames)) {
            throw new RuntimeException("可用數據庫為空");
        }

        List<String> result = new ArrayList<>();

        Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                .getColumnNameAndShardingValuesMap();
        Set<String> keySet = columnNameAndShardingValuesMap.keySet();
        for (String key : keySet) {
            if (!"user_id".equals(key)) {
                continue;
            }

            Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
            Iterator<Comparable<?>> iterator = collection.iterator();
            while (iterator.hasNext()) {
                Integer next = Integer.valueOf(iterator.next().toString());
                Integer index = next % 2;
                String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                result.add(availableTargetName);
            }
        }

        System.out.println(result);

        return result;
    }

}

 

分表規則如下:

package cn.qlq.sharedjdbc.config;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.complex.ComplexKeysShardingValue;

public class MyTableComplexShardingStrategy implements ComplexKeysShardingAlgorithm<Comparable<?>> {

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames,
            ComplexKeysShardingValue<Comparable<?>> shardingValue) {

        System.out.println("=====MyTableComplexShardingStrategy=====");
        System.out.println(availableTargetNames);
        System.out.println(shardingValue);

        if (CollectionUtils.isEmpty(availableTargetNames)) {
            throw new RuntimeException("可用數據表為空");
        }

        List<String> result = new ArrayList<>();

        Map<String, Collection<Comparable<?>>> columnNameAndShardingValuesMap = shardingValue
                .getColumnNameAndShardingValuesMap();
        Set<String> keySet = columnNameAndShardingValuesMap.keySet();
        for (String key : keySet) {
            if (!"order_id".equals(key)) {
                continue;
            }

            Collection<Comparable<?>> collection = columnNameAndShardingValuesMap.get(key);
            Iterator<Comparable<?>> iterator = collection.iterator();
            while (iterator.hasNext()) {
                Integer next = Integer.valueOf(iterator.next().toString());
                Integer index = next % 2;
                String availableTargetName = IteratorUtils.get(availableTargetNames.iterator(), index);
                result.add(availableTargetName);
            }
        }

        System.out.println(result);

        return result;
    }

}

 4.測試

(1)mapper

package cn.qlq.sharedjdbc.client;

import org.apache.ibatis.annotations.Mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;

@Mapper
public interface TOrderMapper extends BaseMapper<TOrder> {

}

(2)controller

package cn.qlq.sharedjdbc.client;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import cn.qlq.utils.SnowflakeIdWorker;

@RestController
@RequestMapping("tOrder")
public class TOrderController {

    private Map<String, Object> result = new HashMap<>();

    @Autowired
    private TOrderMapper tOrderMapper;

    @RequestMapping("testAdd")
    public Map<String, Object> testAdd() {

        for (int i = 0; i < 20; i++) {
            TOrder tOrder = new TOrder();
            tOrder.setId(new SnowflakeIdWorker(0, 0).nextId());
            tOrder.setOrder_id(RandomUtils.nextLong(0, 5000000));
            tOrder.setUser_id(RandomUtils.nextLong(0, 5000000));
            tOrder.setDescription("description " + i);
            tOrderMapper.insert(tOrder);
        }

        result.put("success", true);
        return result;
    }

    @RequestMapping("list")
    public Map<String, Object> list() {
        List<TOrder> selectList = tOrderMapper.selectList(null);

        result.put("data", selectList);
        return result;
    }

}

(3)雪華算法生成唯一ID

package cn.qlq.utils;

/**
 * Twitter_Snowflake<br>
 * SnowFlake的結構如下(每部分用-分開):<br>
 * 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 -
 * 000000000000 <br>
 * 1位標識,由於long基本類型在Java中是帶符號的,最高位是符號位,正數是0,負數是1,所以id一般是正數,最高位是0<br>
 * 41位時間截(毫秒級),注意,41位時間截不是存儲當前時間的時間截,而是存儲時間截的差值(當前時間截 - 開始時間截)
 * 得到的值),這里的的開始時間截,一般是我們的id生成器開始使用的時間,由我們程序來指定的(如下下面程序IdWorker類的startTime屬性)。
 * 41位的時間截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>
 * 10位的數據機器位,可以部署在1024個節點,包括5位datacenterId和5位workerId<br>
 * 12位序列,毫秒內的計數,12位的計數順序號支持每個節點每毫秒(同一機器,同一時間截)產生4096個ID序號<br>
 * 加起來剛好64位,為一個Long型。<br>
 * SnowFlake的優點是,整體上按照時間自增排序,並且整個分布式系統內不會產生ID碰撞(由數據中心ID和機器ID作區分),並且效率較高,
 * 經測試,SnowFlake每秒能夠產生26萬ID左右。
 */
public class SnowflakeIdWorker {

    // ==============================Fields===========================================
    /** 開始時間截 (2015-01-01) */
    private final long twepoch = 1420041600000L;

    /** 機器id所占的位數 */
    private final long workerIdBits = 5L;

    /** 數據標識id所占的位數 */
    private final long datacenterIdBits = 5L;

    /** 支持的最大機器id,結果是31 (這個移位算法可以很快的計算出幾位二進制數所能表示的最大十進制數) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 支持的最大數據標識id,結果是31 */
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);

    /** 序列在id中占的位數 */
    private final long sequenceBits = 12L;

    /** 機器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 數據標識id向左移17位(12+5) */
    private final long datacenterIdShift = sequenceBits + workerIdBits;

    /** 時間截向左移22位(5+5+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;

    /** 生成序列的掩碼,這里為4095 (0b111111111111=0xfff=4095) */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作機器ID(0~31) */
    private long workerId;

    /** 數據中心ID(0~31) */
    private long datacenterId;

    /** 毫秒內序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的時間截 */
    private long lastTimestamp = -1L;

    // ==============================Constructors=====================================
    /**
     * 構造函數
     * 
     * @param workerId
     *            工作ID (0~31)
     * @param datacenterId
     *            數據中心ID (0~31)
     */
    public SnowflakeIdWorker(long workerId, long datacenterId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(
                    String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(
                    String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
    }

    // ==============================Methods==========================================
    /**
     * 獲得下一個ID (該方法是線程安全的)
     * 
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();

        // 如果當前時間小於上一次ID生成的時間戳,說明系統時鍾回退過這個時候應當拋出異常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format(
                    "Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        // 如果是同一時間生成的,則進行毫秒內序列
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            // 毫秒內序列溢出
            if (sequence == 0) {
                // 阻塞到下一個毫秒,獲得新的時間戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        // 時間戳改變,毫秒內序列重置
        else {
            sequence = 0L;
        }

        // 上次生成ID的時間截
        lastTimestamp = timestamp;

        // 移位並通過或運算拼到一起組成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (datacenterId << datacenterIdShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一個毫秒,直到獲得新的時間戳
     * 
     * @param lastTimestamp
     *            上次生成ID的時間截
     * @return 當前時間戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒為單位的當前時間
     * 
     * @return 當前時間(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    // ==============================Test=============================================
    /** 測試 */
    public static void main(String[] args) {
        SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        for (int i = 0; i < 1000; i++) {
            long id = idWorker.nextId();
            System.out.println(Long.toBinaryString(id));
            System.out.println(id);
        }
    }
}

訪問一次add后查看數據庫表:

(1)查看demo_ds_0.t_order_0。分片規則是user_id為偶數、order_id為偶數

SELECT * FROM demo_ds_0.t_order_0;

結果 

 

 (2)查看demo_ds_0.t_order_1。分片規則是user_id為偶數、order_id為奇數

select * from demo_ds_0.t_order_1;

結果:

 (3)查看demo_ds_1.t_order_0。分片規則是user_id為奇數、order_id為偶數

SELECT * FROM demo_ds_1.t_order_0;

結果:

 (4)查看demo_ds_1.t_order_1。分片規則是user_id為奇數、order_id為奇數

 

  至此實現了mybatis結合shardingjdbc實現分庫分表。可以實現對某些表進行分庫分表、也可以對不需要分片的使用默認的數據源(shardingRuleConfig.setDefaultDataSourceName("defaultDataSource");)。

  結合Mybatis使用的時候,對分片的表應當使用其邏輯表名稱。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM