Mysql系列六:(Mycat分片路由原理、Mycat常用分片規則及對應源碼介紹)


 一、Mycat分片路由原理

我們先來看下面的一個SQL在Mycat里面是如何執行的:

select * from travelrecord where id in(5000001, 10000001);

有3個分片dn1,dn2,dn3, id=5000001這條數據在dn2上,id=10000001這條數據在dn3上。

查詢時可能有出現的問題:

1)全部掃描一遍dn1  dn2  dn3,結果導致性能浪費。

2)只掃描某個片。漏掉數據的情況。

總結:

不能多掃——>性能不足

也不能少——>漏掉數據

那么Mycat是如何解決上面的問題的呢?

Mycat使用Druid的DruidParser作為分析器/解析器,解析的過程主要有Visitor和Statement兩個階段

說明:

1)Visitor過程,解析出如下屬性:

              哪一張表

              字段列表

              條件信息

              什么樣的SQL

解析出以上信息以后就可以根據schema.xml和rule.xml文件確認要去哪個分片上進行DML操作了

2)Statement過程轉化:轉化后知道執行的是什么樣的SQL(增刪改查)

3)改寫SQL

通過查詢條件可以知道要查詢的數據都在哪些分片上

  Dn2, id= 5000001

  Dn3, id= 100000001

所以SQL被改寫成以下的形式:

select * from travelrecord where id = 5000001;(dn2執行)
select * from travelrecord where id = 10000001;(dn3執行)

4)分別在分片dn2,dn3上執行第 3)步改寫的SQL,然后把從dn2,dn3上得到的結果進行拼裝就是最終的結果了

備注:

多表關聯查詢的時候,visitor會遍歷出所有的表,然后去各個分片上去獲取結果,同時把結果緩存起來,最后根據關聯查詢計算出結果。

確定分片的過程:首先看where條件里面是否含有分片字段,有就根據分片字段的值結合schema.xmlrule.xml的值確定是哪個分片。當不能確定在哪一個分片上的時候,mycat會到所有的分片上去找

二、Mycat常用分片規則

1. 時間類:按天分片、自然月分片、單月小時分片

2. 哈希類:Hash固定分片、日期范圍Hash分片、截取數字Hash求模范圍分片、截取數字Hash分片、一致性Hash分片

3. 取模類:取模分片、取模范圍分片、范圍求模分片

4. 其他類:枚舉分片、范圍約定分片、應用指定分片、冷熱數據分片

下面基於源碼來介紹Mycat的常用分片規則,源碼地址

三、Mycat常用分片規則介紹

說明:分片規則都定義在rule.xml文件里面

    <!--
        tableRule標簽:定義table分片策略
    -->
    <tableRule name="rule1">
        <!--
        rule標簽:策略定義標簽
        -->
        <rule>
            <!--
            columns標簽:對應的分片字段
            -->
            <columns>id</columns>
            <!--
            algorithm標簽:tableRule分片策略對應的function名稱
            -->
            <algorithm>func1</algorithm>
        </rule>
    </tableRule>
    <!-- 定義分片函數 -->
    <function name="func1" class="io.mycat.route.function.PartitionByLong">
        <property name="partitionCount">1,1,2,3,1</property><!-- 分片數 -->
        <property name="partitionLength">128,128,128,128,128</property><!-- 分片長度 -->
    </function>

1. 自動范圍分片

在rule.xml里面的配置:

<tableRule name="auto-sharding-long">
   <rule>
      <columns>id</columns>
     <algorithm>rang-long</algorithm>
   </rule>
</tableRule>

<function name="rang-long"
        class="io.mycat.route.function.AutoPartitionByLong">
        <property name="mapFile">autopartition-long.txt</property>
</function>

說明:

有3個分片,第1個分片存儲的是1-500000的數據,第2個分片存儲的是500001-1000000的數據,第3個分片存儲的是1000001-1500000的數據

insert into employee(id, name) value(1,Tom);在第1個分片

insert into employee(id, name) value(500002,Jack);在第2個分片

insert into employee(id, name) value(1000002,Lucy);在第3個分片

對應代碼:

package io.mycat.route.function;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;

import io.mycat.config.model.rule.RuleAlgorithm;

/**
 * auto partition by Long ,can be used in auto increment primary key partition
 * 
 * @author wuzhi
 */
public class AutoPartitionByLong extends AbstractPartitionAlgorithm implements RuleAlgorithm{

    private String mapFile;
    private LongRange[] longRongs;
    
    private int defaultNode = -1;
    @Override
    public void init() {

        initialize();
    }

    public void setMapFile(String mapFile) {
        this.mapFile = mapFile;
    }

    @Override
    public Integer calculate(String columnValue)  {
//        columnValue = NumberParseUtil.eliminateQoute(columnValue);
        try {
            long value = Long.parseLong(columnValue);
            Integer rst = null;
            for (LongRange longRang : this.longRongs) {
                if (value <= longRang.valueEnd && value >= longRang.valueStart) {
                    return longRang.nodeIndx;
                }
            }
            //數據超過范圍,暫時使用配置的默認節點
            if (rst == null && defaultNode >= 0) {
                return defaultNode;
            }
            return rst;
        } catch (NumberFormatException e){
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e);
        }
    }
    
    @Override
    public Integer[] calculateRange(String beginValue, String endValue)  {
        return AbstractPartitionAlgorithm.calculateSequenceRange(this, beginValue, endValue);
    }

    @Override
    public int getPartitionNum() {
//        int nPartition = longRongs.length;
        
        /*
         * fix #1284 這里的統計應該統計Range的nodeIndex的distinct總數
         */
        Set<Integer> distNodeIdxSet = new HashSet<Integer>();
        for(LongRange range : longRongs) {
            distNodeIdxSet.add(range.nodeIndx);
        }
        int nPartition = distNodeIdxSet.size();
        return nPartition;
    }

    private void initialize() {
        BufferedReader in = null;
        try {
            // FileInputStream fin = new FileInputStream(new File(fileMapPath));
            InputStream fin = this.getClass().getClassLoader()
                    .getResourceAsStream(mapFile);
            if (fin == null) {
                throw new RuntimeException("can't find class resource file "
                        + mapFile);
            }
            in = new BufferedReader(new InputStreamReader(fin));
            LinkedList<LongRange> longRangeList = new LinkedList<LongRange>();

            for (String line = null; (line = in.readLine()) != null;) {
                line = line.trim();
                if (line.startsWith("#") || line.startsWith("//")) {
                    continue;
                }
                int ind = line.indexOf('=');
                if (ind < 0) {
                    System.out.println(" warn: bad line int " + mapFile + " :"
                            + line);
                    continue;
                }
                    String pairs[] = line.substring(0, ind).trim().split("-");
                    long longStart = NumberParseUtil.parseLong(pairs[0].trim());
                    long longEnd = NumberParseUtil.parseLong(pairs[1].trim());
                    int nodeId = Integer.parseInt(line.substring(ind + 1)
                            .trim());
                    longRangeList
                            .add(new LongRange(nodeId, longStart, longEnd));

            }
            longRongs = longRangeList.toArray(new LongRange[longRangeList
                    .size()]);
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            } else {
                throw new RuntimeException(e);
            }

        } finally {
            try {
                in.close();
            } catch (Exception e2) {
            }
        }
    }
    
    public int getDefaultNode() {
        return defaultNode;
    }

    public void setDefaultNode(int defaultNode) {
        this.defaultNode = defaultNode;
    }

    static class LongRange {
        public final int nodeIndx;
        public final long valueStart;
        public final long valueEnd;

        public LongRange(int nodeIndx, long valueStart, long valueEnd) {
            super();
            this.nodeIndx = nodeIndx;
            this.valueStart = valueStart;
            this.valueEnd = valueEnd;
        }

    }
}
View Code

2. 枚舉分片

把數據分類存儲

在rule.xml里面的配置:

<tableRule name="sharding-by-intfile">
  <rule>
     <columns>sharding_id</columns>
     <algorithm>hash-int</algorithm>
  </rule>
</tableRule>
<function name="hash-int"
        class="io.mycat.route.function.PartitionByFileMap">
        <property name="mapFile">partition-hash-int.txt</property>
        <property name="defaultNode">0</property> <!-- 找不到分片時設置容錯規則,把數據插入到默認分片0里面 -->
 </function>

說明:找不到分片時設置容錯規則,把數據插入到默認分片0里面

對應代碼:

package io.mycat.route.function;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import io.mycat.config.model.rule.RuleAlgorithm;

/**
 * 
 * @author mycat
 */
public class PartitionByFileMap extends AbstractPartitionAlgorithm implements RuleAlgorithm {

    private String mapFile;
    private Map<Object, Integer> app2Partition;
    /**
     * Map<Object, Integer> app2Partition中key值的類型:默認值為0,0表示Integer,非零表示String
     */
    private int type;
    
    /**
     * 默認節點在map中的key
     */
    private static final String DEFAULT_NODE = "DEFAULT_NODE";
    
    /**
     * 默認節點:小於0表示不設置默認節點,大於等於0表示設置默認節點
     * 
     * 默認節點的作用:枚舉分片時,如果碰到不識別的枚舉值,就讓它路由到默認節點
     *                如果不配置默認節點(defaultNode值小於0表示不配置默認節點),碰到
     *                不識別的枚舉值就會報錯,
     *                like this:can't find datanode for sharding column:column_name val:ffffffff    
     */
    private int defaultNode = -1;

    @Override
    public void init() {

        initialize();
    }

    public void setMapFile(String mapFile) {
        this.mapFile = mapFile;
    }
    
    public void setType(int type) {
        this.type = type;
    }

    public void setDefaultNode(int defaultNode) {
        this.defaultNode = defaultNode;
    }

    @Override
    public Integer calculate(String columnValue)  {
        try {
            Object value = columnValue;
            if (type == 0) {
                value = Integer.valueOf(columnValue);
            }
            Integer rst = null;
            Integer pid = app2Partition.get(value);
            if (pid != null) {
                rst = pid;
            } else {
                rst = app2Partition.get(DEFAULT_NODE);
            }
            return rst;
        } catch (NumberFormatException e){
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e);
        }
    }
    
    @Override
    public int getPartitionNum() {
        Set<Integer> set = new HashSet<Integer>(app2Partition.values());
        int count = set.size();
        return count;
    }

    private void initialize() {
        BufferedReader in = null;
        try {
            // FileInputStream fin = new FileInputStream(new File(fileMapPath));
            InputStream fin = this.getClass().getClassLoader()
                    .getResourceAsStream(mapFile);
            if (fin == null) {
                throw new RuntimeException("can't find class resource file "
                        + mapFile);
            }
            in = new BufferedReader(new InputStreamReader(fin));
            
            app2Partition = new HashMap<Object, Integer>();
            
            for (String line = null; (line = in.readLine()) != null;) {
                line = line.trim();
                if (line.startsWith("#") || line.startsWith("//")) {
                    continue;
                }
                int ind = line.indexOf('=');
                if (ind < 0) {
                    continue;
                }
                try {
                    String key = line.substring(0, ind).trim();
                    int pid = Integer.parseInt(line.substring(ind + 1).trim());
                    if(type == 0) {
                        app2Partition.put(Integer.parseInt(key), pid);
                    } else {
                        app2Partition.put(key, pid);
                    }
                } catch (Exception e) {
                }
            }
            //設置默認節點
            if(defaultNode >= 0) {
                app2Partition.put(DEFAULT_NODE, defaultNode);
            }
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            } else {
                throw new RuntimeException(e);
            }

        } finally {
            try {
                in.close();
            } catch (Exception e2) {
            }
        }
    }
}
View Code

3. Hash固定分片(固定分片Hash算法)

固定分片Hash算法,最多有1024個分片

在rule.xml里面的配置:

    <tableRule name="rule1">
        <!--
        rule標簽:策略定義標簽
        -->
        <rule>
            <!--
            columns標簽:對應的分片字段
            -->
            <columns>id</columns>
            <!--
            algorithm標簽:tableRule分片策略對應的function名稱
            -->
            <algorithm>func1</algorithm>
        </rule>
    </tableRule>
    <function name="func1" class="io.mycat.route.function.PartitionByLong">
        <property name="partitionCount">1,1,2,3,1</property><!-- 分片數 -->
        <property name="partitionLength">128,128,128,128,128</property><!-- 分片長度 -->
    </function>

說明:

1) partitionCount.length必須等於partitionLength.length

2) sum((partitionCount[i]*partitionLength[j])) === 1024——>partitionCount[0]*partitionLength[0]+partitionCount[1]*partitionLength[1] === 1024

即:1*128+1*128+2*128+3*128+1*128 === 1024

eg:

8個分片表

1個分片表的下標為0: 0-127

2個分片表的下標為1: 127-255

...................

8個分片表的下標為7: 896-1024

如何確定落在哪個分片上呢?分片id的值與1024取余可確定在哪個分片上:

id%1024 = 128 則落在第2個分片上

對應代碼:

package io.mycat.route.function;

import io.mycat.config.model.rule.RuleAlgorithm;
import io.mycat.route.util.PartitionUtil;

public final class PartitionByLong extends AbstractPartitionAlgorithm implements RuleAlgorithm {
    protected int[] count;
    protected int[] length;
    protected PartitionUtil partitionUtil;

    private static int[] toIntArray(String string) {
        String[] strs = io.mycat.util.SplitUtil.split(string, ',', true);
        int[] ints = new int[strs.length];
        for (int i = 0; i < strs.length; ++i) {
            ints[i] = Integer.parseInt(strs[i]);
        }
        return ints;
    }

    public void setPartitionCount(String partitionCount) {
        this.count = toIntArray(partitionCount);
    }

    public void setPartitionLength(String partitionLength) {
        this.length = toIntArray(partitionLength);
    }

    @Override
    public void init() {
        partitionUtil = new PartitionUtil(count, length);

    }

    @Override
    public Integer calculate(String columnValue)  {
//        columnValue = NumberParseUtil.eliminateQoute(columnValue);
        try {
            long key = Long.parseLong(columnValue);
            return partitionUtil.partition(key);
        } catch (NumberFormatException e){
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e);
        }
    }
    
    @Override
    public Integer[] calculateRange(String beginValue, String endValue)  {
        return AbstractPartitionAlgorithm.calculateSequenceRange(this, beginValue, endValue);
    }

//    @Override
//    public int getPartitionCount() {
//        int nPartition = 0;
//        for(int i = 0; i < count.length; i++) {
//            nPartition += count[i];
//        }
//        return nPartition;
//    }
    
}
View Code

4. 求模分片

分片字段id%分片數=分片下標

在rule.xml里面的配置:

    <tableRule name="mod-long">
        <rule>
            <columns>id</columns><!--分片字段 -->
            <algorithm>mod-long</algorithm>
        </rule>
    </tableRule>
    <function name="mod-long" class="io.mycat.route.function.PartitionByMod">
        <!-- how many data nodes -->
        <property name="count">3</property><!--分片數 -->
    </function>

對應代碼:

package io.mycat.route.function;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.mycat.config.model.rule.RuleAlgorithm;

/**
 * number column partion by Mod operator
 * if count is 10 then 0 to 0,21 to 1 (21 % 10 =1)
 * @author wuzhih
 *
 */
public class PartitionByMod extends AbstractPartitionAlgorithm implements RuleAlgorithm  {

    private int count;
    @Override
    public void init() {
    
        
    }



    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public Integer calculate(String columnValue)  {
//        columnValue = NumberParseUtil.eliminateQoute(columnValue);
        try {
            BigInteger bigNum = new BigInteger(columnValue).abs();
            return (bigNum.mod(BigInteger.valueOf(count))).intValue();
        } catch (NumberFormatException e){
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e);
        }

    }
    

    @Override
    public int getPartitionNum() {
        int nPartition = this.count;
        return nPartition;
    }

    private static void hashTest()  {
        PartitionByMod hash=new PartitionByMod();
        hash.setCount(11);
        hash.init();
        
        int[] bucket=new int[hash.count];
        
        Map<Integer,List<Integer>> hashed=new HashMap<>();
        
        int total=1000_0000;//數據量
        int c=0;
        for(int i=100_0000;i<total+100_0000;i++){//假設分片鍵從100萬開始
            c++;
            int h=hash.calculate(Integer.toString(i));
            bucket[h]++;
            List<Integer> list=hashed.get(h);
            if(list==null){
                list=new ArrayList<>();
                hashed.put(h, list);
            }
            list.add(i);
        }
        System.out.println(c+"   "+total);
        double d=0;
        c=0;
        int idx=0;
        System.out.println("index    bucket   ratio");
        for(int i:bucket){
            d+=i/(double)total;
            c+=i;
            System.out.println(idx+++"  "+i+"   "+(i/(double)total));
        }
        System.out.println(d+"  "+c);
        
        System.out.println("****************************************************");
        rehashTest(hashed.get(0));
    }
    private static void rehashTest(List<Integer> partition)  {
        PartitionByMod hash=new PartitionByMod();
        hash.count=110;//分片數
        hash.init();
        
        int[] bucket=new int[hash.count];
        
        int total=partition.size();//數據量
        int c=0;
        for(int i:partition){//假設分片鍵從100萬開始
            c++;
            int h=hash.calculate(Integer.toString(i));
            bucket[h]++;
        }
        System.out.println(c+"   "+total);
        c=0;
        int idx=0;
        System.out.println("index    bucket   ratio");
        for(int i:bucket){
            c+=i;
            System.out.println(idx+++"  "+i+"   "+(i/(double)total));
        }
    }
    public static void main(String[] args)  {
//        hashTest();
        PartitionByMod partitionByMod = new PartitionByMod();
        partitionByMod.count=8;
        partitionByMod.calculate("\"6\"");
        partitionByMod.calculate("\'6\'");
    }
}
View Code

5. 自然月分片

按照自然月的方式進行分片

在rule.xml里面的配置:

    <tableRule name="sharding-by-month">
        <rule>
            <columns>create_time</columns>
            <algorithm>partbymonth</algorithm>
        </rule>
    </tableRule>
    <function name="partbymonth"
        class="io.mycat.route.function.PartitionByMonth">
        <property name="dateFormat">yyyy-MM-dd</property>
        <property name="sBeginDate">2015-01-01</property>
    </function>

說明:

如果月份超過了分片數,則通過設置sEndDated的值來解決

如有3個分片,分別插入2015-01-122015-02-122015-03-122016-11-12,月份超過了分片數,此時設置sEndDated= 2015-04-12表示4個月放一個分片,如下可知2016-11-12在分片2

分片0

分片1

分片2

1

5

9

2

6

10

3

7

11

4

8

12

 

 

 

 

 

 

 

 

對應代碼:

package io.mycat.route.function;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;

import io.mycat.config.model.rule.RuleAlgorithm;
import org.apache.log4j.Logger;

/**
 * 例子 按月份列分區 ,每個自然月一個分片,格式 between操作解析的范例
 * 
 * @author wzh
 * 
 */
public class PartitionByMonth extends AbstractPartitionAlgorithm implements
        RuleAlgorithm {
    private static final Logger LOGGER = Logger.getLogger(PartitionByDate.class);
    private String sBeginDate;
    private String dateFormat;
    private String sEndDate;
    private Calendar beginDate;
    private Calendar endDate;
    private int nPartition;

    private ThreadLocal<SimpleDateFormat> formatter;

    @Override
    public void init() {
        try {
            beginDate = Calendar.getInstance();
            beginDate.setTime(new SimpleDateFormat(dateFormat)
                    .parse(sBeginDate));
            formatter = new ThreadLocal<SimpleDateFormat>() {
                @Override
                protected SimpleDateFormat initialValue() {
                    return new SimpleDateFormat(dateFormat);
                }
            };
            if(sEndDate!=null&&!sEndDate.equals("")) {
                endDate = Calendar.getInstance();
                endDate.setTime(new SimpleDateFormat(dateFormat).parse(sEndDate));
                nPartition = ((endDate.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR)) * 12
                                + endDate.get(Calendar.MONTH) - beginDate.get(Calendar.MONTH)) + 1;

                if (nPartition <= 0) {
                    throw new java.lang.IllegalArgumentException("Incorrect time range for month partitioning!");
                }
            } else {
                nPartition = -1;
            }
        } catch (ParseException e) {
            throw new java.lang.IllegalArgumentException(e);
        }
    }

    /**
     * For circulatory partition, calculated value of target partition needs to be
     * rotated to fit the partition range
     */
    private int reCalculatePartition(int targetPartition) {
        /**
         * If target date is previous of start time of partition setting, shift
         * the delta range between target and start date to be positive value
         */
        if (targetPartition < 0) {
            targetPartition = nPartition - (-targetPartition) % nPartition;
        }

        if (targetPartition >= nPartition) {
            targetPartition =  targetPartition % nPartition;
        }

        return targetPartition;
    }

    @Override
    public Integer calculate(String columnValue)  {
        try {
            int targetPartition;
            Calendar curTime = Calendar.getInstance();
            curTime.setTime(formatter.get().parse(columnValue));
            targetPartition = ((curTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR))
                    * 12 + curTime.get(Calendar.MONTH)
                    - beginDate.get(Calendar.MONTH));

            /**
             * For circulatory partition, calculated value of target partition needs to be
             * rotated to fit the partition range
              */
            if (nPartition > 0) {
                targetPartition = reCalculatePartition(targetPartition);
            }
            return targetPartition;

        } catch (ParseException e) {
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e);
        }
    }

    @Override
    public Integer[] calculateRange(String beginValue, String endValue) {
        try {
            int startPartition, endPartition;
            Calendar partitionTime = Calendar.getInstance();
            SimpleDateFormat format = new SimpleDateFormat(dateFormat);
            partitionTime.setTime(format.parse(beginValue));
            startPartition = ((partitionTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR))
                    * 12 + partitionTime.get(Calendar.MONTH)
                    - beginDate.get(Calendar.MONTH));
            partitionTime.setTime(format.parse(endValue));
            endPartition = ((partitionTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR))
                    * 12 + partitionTime.get(Calendar.MONTH)
                    - beginDate.get(Calendar.MONTH));

            List<Integer> list = new ArrayList<>();

            while (startPartition <= endPartition) {
                Integer nodeValue = reCalculatePartition(startPartition);
                if (Collections.frequency(list, nodeValue) < 1)
                    list.add(nodeValue);
                startPartition++;
            }
            int size = list.size();
            return (list.toArray(new Integer[size]));
        } catch (ParseException e) {
            LOGGER.error("error",e);
            return new Integer[0];
        }
    }
    
    @Override
    public int getPartitionNum() {
        int nPartition = this.nPartition;
        return nPartition;
    }

    public void setsBeginDate(String sBeginDate) {
        this.sBeginDate = sBeginDate;
    }

    public void setDateFormat(String dateFormat) {
        this.dateFormat = dateFormat;
    }

    public void setsEndDate(String sEndDate) {
        this.sEndDate = sEndDate;
    }

}
View Code

6. 匹配求模分片

根據prefixLength截取n個字符charAt(i)每個字符的值進行累加得到一個整數,然后和分區長度patternValue進行求模,得出的值就是分區編號。

在rule.xml里面的配置:

    <tableRule name="partitionbyprefixpattern">
        <rule>
            <columns>id</columns>
            <algorithm>partitionbyprefixpattern</algorithm>
        </rule>
    </tableRule>
    <function name="partitionbyprefixpattern" class="io.mycat.route.function.PartitionByPrefixPattern">
        <property name="patternValue">3</property>  <!-- 分區長度/分區數量 -->
        <property name="prefixLength">6</property>  <!-- 截取多少字符串 -->
    </function>

說明:

有下面這種類型的數據

年月+大區+流水編號

201801 01   10001

就可以采用匹配求模分片,把分片字段columns取前6個字符串201801charAt(i)每個字符的值進行累加得到一個整數,然后和分區長度3進行求模,得出的值就是分區編號

對應代碼:

package io.mycat.route.function;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;

import io.mycat.config.model.rule.RuleAlgorithm;
import io.mycat.route.function.AutoPartitionByLong.LongRange;

/**
 * partition by Prefix length ,can be used in String partition
 * 
 * @author hexiaobin
 */
public class PartitionByPrefixPattern extends AbstractPartitionAlgorithm implements RuleAlgorithm {
    private static final int PARTITION_LENGTH = 1024;
    private int patternValue = PARTITION_LENGTH;// 分區長度,取模數值(默認為1024)
    private int prefixLength;// 字符前幾位進行ASCII碼取和
    private String mapFile;
    private LongRange[] longRongs;

    @Override
    public void init() {

        initialize();
    }

    public void setMapFile(String mapFile) {
        this.mapFile = mapFile;
    }

    public void setPatternValue(int patternValue) {
        this.patternValue = patternValue;
    }

    public void setPrefixLength(int prefixLength) {
        this.prefixLength = prefixLength;
    }

    @Override
    public Integer calculate(String columnValue)  {
        try {
            int Length = Integer.valueOf(prefixLength);

            Length = columnValue.length() < Length ? columnValue.length() : Length;
            int sum = 0;
            for (int i = 0; i < Length; i++) {
                sum = sum + columnValue.charAt(i);
            }
            Integer rst = null;
            for (LongRange longRang : this.longRongs) {
                long hash = sum % patternValue;
                if (hash <= longRang.valueEnd && hash >= longRang.valueStart) {
                    return longRang.nodeIndx;
                }
            }
            return rst;
        } catch (NumberFormatException e){
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e);
        }
    }
    
    @Override
    public int getPartitionNum() {
//        int nPartition = this.longRongs.length;
        /*
         * fix #1284 這里的統計應該統計Range的nodeIndex的distinct總數
         */
        Set<Integer> distNodeIdxSet = new HashSet<Integer>();
        for(LongRange range : longRongs) {
            distNodeIdxSet.add(range.nodeIndx);
        }
        int nPartition = distNodeIdxSet.size();
        return nPartition;
    }

    private void initialize() {
        BufferedReader in = null;
        try {
            // FileInputStream fin = new FileInputStream(new File(fileMapPath));
            InputStream fin = this.getClass().getClassLoader()
                    .getResourceAsStream(mapFile);
            if (fin == null) {
                throw new RuntimeException("can't find class resource file "
                        + mapFile);
            }
            in = new BufferedReader(new InputStreamReader(fin));
            LinkedList<LongRange> longRangeList = new LinkedList<LongRange>();

            for (String line = null; (line = in.readLine()) != null;) {
                line = line.trim();
                if (line.startsWith("#") || line.startsWith("//")) {
                    continue;
                }
                int ind = line.indexOf('=');
                if (ind < 0) {
                    System.out.println(" warn: bad line int " + mapFile + " :"
                            + line);
                    continue;
                }
                    String pairs[] = line.substring(0, ind).trim().split("-");
                    long longStart = NumberParseUtil.parseLong(pairs[0].trim());
                    long longEnd = NumberParseUtil.parseLong(pairs[1].trim());
                    int nodeId = Integer.parseInt(line.substring(ind + 1)
                            .trim());
                    longRangeList
                            .add(new LongRange(nodeId, longStart, longEnd));

            }
            longRongs = longRangeList.toArray(new LongRange[longRangeList
                    .size()]);
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            } else {
                throw new RuntimeException(e);
            }

        } finally {
            try {
                in.close();
            } catch (Exception e2) {
            }
        }
    }

    static class LongRange {
        public final int nodeIndx;
        public final long valueStart;
        public final long valueEnd;

        public LongRange(int nodeIndx, long valueStart, long valueEnd) {
            super();
            this.nodeIndx = nodeIndx;
            this.valueStart = valueStart;
            this.valueEnd = valueEnd;
        }

    }
}
View Code

7. 冷熱數據分片

根據日期查詢日志數據冷熱數據分布 ,最近 n 個月的到實時交易庫查詢,超過 n 個月的按照 m 天分片

在rule.xml里面的配置:

    <tableRule name="sharding-by-date">
        <rule>
            <columns>create_time</columns>
            <algorithm>sharding-by-hotdate</algorithm>
        </rule>
    </tableRule>

    <function name="sharding-by-hotdate" class="org.opencloudb.route.function.PartitionByHotDate">
        <property name="dateFormat">yyyy-MM-dd</property> <!-- 定義日期格式 -->
        <property name="sLastDay">30</property> <!-- 熱庫存儲多少天數據 -->
        <property name="sPartionDay">30</property> <!-- 超過熱庫期限的數據按照多少天來分片 -->
    </function>

對應代碼:

package io.mycat.route.function;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.config.model.rule.RuleAlgorithm;

/**
 * 根據日期查詢日志數據 冷熱數據分布 ,最近n個月的到實時交易庫查詢,超過n個月的按照m天分片
 * 
 * @author sw
 * 
 * <tableRule name="sharding-by-date">
      <rule>
        <columns>create_time</columns>
        <algorithm>sharding-by-hotdate</algorithm>
      </rule>
   </tableRule>  
<function name="sharding-by-hotdate" class="org.opencloudb.route.function.PartitionByHotDate">
    <property name="dateFormat">yyyy-MM-dd</property>
    <property name="sLastDay">10</property>
    <property name="sPartionDay">30</property>
  </function>
 */
public class PartitionByHotDate extends AbstractPartitionAlgorithm implements RuleAlgorithm {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionByHotDate.class);

    private String dateFormat;
    private String sLastDay;
    private String sPartionDay;

    private long sLastTime;
    private long partionTime;
    private ThreadLocal<SimpleDateFormat> formatter;
    
    private long beginDate;

    private static final long oneDay = 86400000;

    @Override
    public void init() {
        try {
            formatter = new ThreadLocal<SimpleDateFormat>() {
                @Override
                protected SimpleDateFormat initialValue() {
                    return new SimpleDateFormat(dateFormat);
                }
            };
            sLastTime = Integer.valueOf(sLastDay);
            partionTime = Integer.parseInt(sPartionDay) * oneDay;
        } catch (Exception e) {
            throw new java.lang.IllegalArgumentException(e);
        }
    }

    @Override
    public Integer calculate(String columnValue)  {
        Integer targetPartition = -1;
        try {
            long targetTime = formatter.get().parse(columnValue).getTime();
            Calendar now = Calendar.getInstance();
            long nowTime = now.getTimeInMillis();
            
            beginDate = nowTime - sLastTime * oneDay;
            
            long diffDays = (nowTime - targetTime) / (1000 * 60 * 60 * 24) + 1;
            if(diffDays-sLastTime <= 0 || diffDays<0 ){
                targetPartition = 0;
            }else{
                targetPartition = (int) ((beginDate - targetTime) / partionTime) + 1;
            }
            
            LOGGER.debug("PartitionByHotDate calculate for " + columnValue + " return " + targetPartition);
            return targetPartition;
        } catch (ParseException e) {
            throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e);
        }
    }

    @Override
    public Integer[] calculateRange(String beginValue, String endValue)  {
        Integer[] targetPartition = null;
        try {
            long startTime = formatter.get().parse(beginValue).getTime();
            long endTime = formatter.get().parse(endValue).getTime();
            Calendar now = Calendar.getInstance();
            long nowTime = now.getTimeInMillis();
            
            long limitDate = nowTime - sLastTime * oneDay;
            long diffDays = (nowTime - startTime) / (1000 * 60 * 60 * 24) + 1;
            if(diffDays-sLastTime <= 0 || diffDays<0 ){
                Integer [] re = new Integer[1];
                re[0] = 0;
                targetPartition = re ;
            }else{
                Integer [] re = null;
                Integer begin = 0, end = 0;
                end = this.calculate(beginValue);
                boolean hasLimit = false;
                if(endTime-limitDate > 0){
                    endTime = limitDate;
                    hasLimit = true;
                }
                begin = this.calculate(formatter.get().format(endTime));
                if(begin == null || end == null){
                    return re;
                }
                if (end >= begin) {
                    int len = end-begin+1;
                    if(hasLimit){
                        re = new Integer[len+1];
                        re[0] = 0;
                        for(int i =0;i<len;i++){
                            re[i+1]=begin+i;
                        }
                    }else{
                        re = new Integer[len];
                        for(int i=0;i<len;i++){
                            re[i]=begin+i;
                        }
                    }
                    return re;
                }else{
                    return re;
                }
            }
        } catch (ParseException e) {
            throw new IllegalArgumentException(new StringBuilder().append("endValue:").append(endValue).append(" Please check if the format satisfied.").toString(),e);
        }
        return targetPartition;
    }

    public void setsPartionDay(String sPartionDay) {
        this.sPartionDay = sPartionDay;
    }
    public void setDateFormat(String dateFormat) {
        this.dateFormat = dateFormat;
    }
    public String getsLastDay() {
        return sLastDay;
    }
    public void setsLastDay(String sLastDay) {
        this.sLastDay = sLastDay;
    }
}
View Code

8. 一致性哈希分片

1)首先求出mysql服務器(節點)的哈希值,並將其配置到0~2^32的圓(continuum)上。

2)為每台mysql服務器物理節點虛擬出多個虛擬節點,並計算hash值映射到相同的圓上。

3)然后從數據映射到的mysql服務器虛擬節點的位置開始順時針查找,將數據保存到找到的第一個mysql服務器上。如果超過232仍然找不到服務器,就會保存到第一台mysql服務器上。

 

特點:解決數據均勻分布

在rule.xml里面的配置:

<tableRule name="sharding-by-murmur">
    <rule>
        <columns>id</columns>
        <algorithm>murmur</algorithm>
    </rule>
</tableRule>
<function name="murmur"
        class="io.mycat.route.function.PartitionByMurmurHash">
        <property name="seed">0</property><!-- 默認是0 -->
        <property name="count">2</property><!-- 要分片的數據庫節點數量,必須指定,否則沒法分片 -->
        <property name="virtualBucketTimes">160</property><!-- 一個實際的數據庫節點被映射為這么多虛擬節點,默認是160倍,也就是虛擬節點數是物理節點數的160倍 -->
        <!-- <property name="weightMapFile">weightMapFile</property> 節點的權重,沒有指定權重的節點默認是1。以properties文件的格式填寫,以從0開始到count-1的整數值也就是節點索引為key,以節點權重值為值。所有權重值必須是正整數,否則以1代替 -->
        <!-- <property name="bucketMapPath">/etc/mycat/bucketMapPath</property> 
            用於測試時觀察各物理節點與虛擬節點的分布情況,如果指定了這個屬性,會把虛擬節點的murmur hash值與物理節點的映射按行輸出到這個文件,沒有默認值,如果不指定,就不會輸出任何東西 -->
    </function>

對應代碼:

package io.mycat.route.function;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
import java.util.TreeMap;

import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;

import io.mycat.config.model.rule.RuleAlgorithm;
import io.mycat.util.exception.MurmurHashException;

/**
 * consistancy hash, murmur hash
 * implemented by Guava
 * @author wuzhih
 *
 */
public class PartitionByMurmurHash extends AbstractPartitionAlgorithm implements RuleAlgorithm  {
    private static final int DEFAULT_VIRTUAL_BUCKET_TIMES=160;
    private static final int DEFAULT_WEIGHT=1;
    private static final Charset DEFAULT_CHARSET=Charset.forName("UTF-8");
    
    private int seed;
    private int count;
    private int virtualBucketTimes=DEFAULT_VIRTUAL_BUCKET_TIMES;
    private Map<Integer,Integer> weightMap=new HashMap<>();
//    private String bucketMapPath;
    
    private HashFunction hash;
    
    private SortedMap<Integer,Integer> bucketMap;
    @Override
    public void init()  {
        try{
            bucketMap=new TreeMap<>();
//            boolean serializableBucketMap=bucketMapPath!=null && bucketMapPath.length()>0;
//            if(serializableBucketMap){
//                File bucketMapFile=new File(bucketMapPath);
//                if(bucketMapFile.exists() && bucketMapFile.length()>0){
//                    loadBucketMapFile();
//                    return;
//                }
//            }
            generateBucketMap();
//            if(serializableBucketMap){
//                storeBucketMap();
//            }
        }catch(Exception e){
            throw new MurmurHashException(e);
        }
    }

    private void generateBucketMap(){
        hash=Hashing.murmur3_32(seed);//計算一致性哈希的對象
        for(int i=0;i<count;i++){//構造一致性哈希環,用TreeMap表示
            StringBuilder hashName=new StringBuilder("SHARD-").append(i);
            for(int n=0,shard=virtualBucketTimes*getWeight(i);n<shard;n++){
                bucketMap.put(hash.hashUnencodedChars(hashName.append("-NODE-").append(n)).asInt(),i);
            }
        }
        weightMap=null;
    }
//    private void storeBucketMap() throws IOException{
//        try(OutputStream store=new FileOutputStream(bucketMapPath)){
//            Properties props=new Properties();
//            for(Map.Entry entry:bucketMap.entrySet()){
//                props.setProperty(entry.getKey().toString(), entry.getValue().toString());
//            }
//            props.store(store,null);
//        }
//    }
//    private void loadBucketMapFile() throws FileNotFoundException, IOException{
//        try(InputStream in=new FileInputStream(bucketMapPath)){
//            Properties props=new Properties();
//            props.load(in);
//            for(Map.Entry entry:props.entrySet()){
//                bucketMap.put(Integer.parseInt(entry.getKey().toString()), Integer.parseInt(entry.getValue().toString()));
//            }
//        }
//    }
    /**
     * 得到桶的權重,桶就是實際存儲數據的DB實例
     * 從0開始的桶編號為key,權重為值,權重默認為1。
     * 鍵值必須都是整數
     * @param bucket
     * @return
     */
    private int getWeight(int bucket){
        Integer w=weightMap.get(bucket);
        if(w==null){
            w=DEFAULT_WEIGHT;
        }
        return w;
    }
    /**
     * 創建murmur_hash對象的種子,默認0
     * @param seed
     */
    public void setSeed(int seed){
        this.seed=seed;
    }
    /**
     * 節點的數量
     * @param count
     */
    public void setCount(int count) {
        this.count = count;
    }
    /**
     * 虛擬節點倍數,virtualBucketTimes*count就是虛擬結點數量
     * @param virtualBucketTimes
     */
    public void setVirtualBucketTimes(int virtualBucketTimes){
        this.virtualBucketTimes=virtualBucketTimes;
    }
    /**
     * 節點的權重,沒有指定權重的節點默認是1。以properties文件的格式填寫,以從0開始到count-1的整數值也就是節點索引為key,以節點權重值為值。
     * 所有權重值必須是正整數,否則以1代替
     * @param weightMapPath
     * @throws IOException
     * @throws
     */
    public void setWeightMapFile(String weightMapPath) throws IOException{
        Properties props=new Properties();
        try(BufferedReader reader=new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(weightMapPath), DEFAULT_CHARSET))){
            props.load(reader);
            for(Map.Entry entry:props.entrySet()){
                int weight=Integer.parseInt(entry.getValue().toString());
                weightMap.put(Integer.parseInt(entry.getKey().toString()), weight>0?weight:1);
            }
        }
    }
//    /**
//     * 保存一致性hash的虛擬節點文件路徑。
//     * 如果這個文件不存在或是空文件就按照指定的count, weightMapFile等構造新的MurmurHash數據結構並保存到這個路徑的文件里。
//     * 如果這個文件已存在且不是空文件就加載這個文件里的內容作為MurmurHash數據結構,此時其它參數都忽略。
//     * 除第一次以外在之后增加節點時可以直接修改這個文件,不過不推薦這么做。如果節點數量變化了,推薦刪除這個文件。
//     * 可以不指定這個路徑,不指定路徑時不會保存murmur hash
//     * @param bucketMapPath
//     */
//    public void setBucketMapPath(String bucketMapPath){
//        this.bucketMapPath=bucketMapPath;
//    }
    @Override
    public Integer calculate(String columnValue) {
        SortedMap<Integer, Integer> tail = bucketMap.tailMap(hash.hashUnencodedChars(columnValue).asInt());
        if (tail.isEmpty()) {
            return bucketMap.get(bucketMap.firstKey());
        }
        return tail.get(tail.firstKey());
    }

    @Override
    public int getPartitionNum() {
        int nPartition = this.count;
        return nPartition;
    }

    private static void hashTest() throws IOException{
        PartitionByMurmurHash hash=new PartitionByMurmurHash();
        hash.count=10;//分片數
        hash.init();

        int[] bucket=new int[hash.count];

        Map<Integer,List<Integer>> hashed=new HashMap<>();

        int total=1000_0000;//數據量
        int c=0;
        for(int i=100_0000;i<total+100_0000;i++){//假設分片鍵從100萬開始
            c++;
            int h=hash.calculate(Integer.toString(i));
            bucket[h]++;
            List<Integer> list=hashed.get(h);
            if(list==null){
                list=new ArrayList<>();
                hashed.put(h, list);
            }
            list.add(i);
        }
        System.out.println(c+"   "+total);
        double d=0;
        c=0;
        int idx=0;
        System.out.println("index    bucket   ratio");
        for(int i:bucket){
            d+=i/(double)total;
            c+=i;
            System.out.println(idx+++"  "+i+"   "+(i/(double)total));
        }
        System.out.println(d+"  "+c);

        Properties props=new Properties();
        for(Map.Entry entry:hash.bucketMap.entrySet()){
            props.setProperty(entry.getKey().toString(), entry.getValue().toString());
        }
        ByteArrayOutputStream out=new ByteArrayOutputStream();
        props.store(out, null);

        props.clear();
        props.load(new ByteArrayInputStream(out.toByteArray()));
        System.out.println(props);
        System.out.println("****************************************************");
//        rehashTest(hashed.get(0));
    }
    private static void rehashTest(List<Integer> partition){
        PartitionByMurmurHash hash=new PartitionByMurmurHash();
        hash.count=12;//分片數
        hash.init();
        
        int[] bucket=new int[hash.count];
        
        int total=partition.size();//數據量
        int c=0;
        for(int i:partition){//假設分片鍵從100萬開始
            c++;
            int h=hash.calculate(Integer.toString(i));
            bucket[h]++;
        }
        System.out.println(c+"   "+total);
        c=0;
        int idx=0;
        System.out.println("index    bucket   ratio");
        for(int i:bucket){
            c+=i;
            System.out.println(idx+++"  "+i+"   "+(i/(double)total));
        }
    }
    public static void main(String[] args) throws IOException {
        hashTest();
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM