一、Mycat分片路由原理
我們先來看下面的一個SQL在Mycat里面是如何執行的:
select * from travelrecord where id in(5000001, 10000001);
有3個分片dn1,dn2,dn3, id=5000001這條數據在dn2上,id=10000001這條數據在dn3上。
查詢時可能有出現的問題:
1)全部掃描一遍dn1 dn2 dn3,結果導致性能浪費。
2)只掃描某個片。漏掉數據的情況。
總結:
不能多掃——>性能不足
也不能少——>漏掉數據
那么Mycat是如何解決上面的問題的呢?
Mycat使用Druid的DruidParser作為分析器/解析器,解析的過程主要有Visitor和Statement兩個階段
說明:
1)Visitor過程,解析出如下屬性:
哪一張表
字段列表
條件信息
什么樣的SQL
解析出以上信息以后就可以根據schema.xml和rule.xml文件確認要去哪個分片上進行DML操作了
2)Statement過程轉化:轉化后知道執行的是什么樣的SQL(增刪改查)
3)改寫SQL
通過查詢條件可以知道要查詢的數據都在哪些分片上
Dn2, id= 5000001
Dn3, id= 100000001
所以SQL被改寫成以下的形式:
select * from travelrecord where id = 5000001;(dn2執行) select * from travelrecord where id = 10000001;(dn3執行)
4)分別在分片dn2,dn3上執行第 3)步改寫的SQL,然后把從dn2,dn3上得到的結果進行拼裝就是最終的結果了
備注:
多表關聯查詢的時候,visitor會遍歷出所有的表,然后去各個分片上去獲取結果,同時把結果緩存起來,最后根據關聯查詢計算出結果。
確定分片的過程:首先看where條件里面是否含有分片字段,有就根據分片字段的值結合schema.xml、rule.xml的值確定是哪個分片。當不能確定在哪一個分片上的時候,mycat會到所有的分片上去找
二、Mycat常用分片規則
1. 時間類:按天分片、自然月分片、單月小時分片
2. 哈希類:Hash固定分片、日期范圍Hash分片、截取數字Hash求模范圍分片、截取數字Hash分片、一致性Hash分片
3. 取模類:取模分片、取模范圍分片、范圍求模分片
4. 其他類:枚舉分片、范圍約定分片、應用指定分片、冷熱數據分片
下面基於源碼來介紹Mycat的常用分片規則,源碼地址
三、Mycat常用分片規則介紹
說明:分片規則都定義在rule.xml文件里面
<!-- tableRule標簽:定義table分片策略 --> <tableRule name="rule1"> <!-- rule標簽:策略定義標簽 --> <rule> <!-- columns標簽:對應的分片字段 --> <columns>id</columns> <!-- algorithm標簽:tableRule分片策略對應的function名稱 --> <algorithm>func1</algorithm> </rule> </tableRule> <!-- 定義分片函數 --> <function name="func1" class="io.mycat.route.function.PartitionByLong"> <property name="partitionCount">1,1,2,3,1</property><!-- 分片數 --> <property name="partitionLength">128,128,128,128,128</property><!-- 分片長度 --> </function>
1. 自動范圍分片
在rule.xml里面的配置:
<tableRule name="auto-sharding-long"> <rule> <columns>id</columns> <algorithm>rang-long</algorithm> </rule> </tableRule> <function name="rang-long" class="io.mycat.route.function.AutoPartitionByLong"> <property name="mapFile">autopartition-long.txt</property> </function>
說明:
有3個分片,第1個分片存儲的是1-500000的數據,第2個分片存儲的是500001-1000000的數據,第3個分片存儲的是1000001-1500000的數據
insert into employee(id, name) value(1,Tom);在第1個分片
insert into employee(id, name) value(500002,Jack);在第2個分片
insert into employee(id, name) value(1000002,Lucy);在第3個分片
對應代碼:

package io.mycat.route.function; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.HashSet; import java.util.LinkedList; import java.util.Set; import io.mycat.config.model.rule.RuleAlgorithm; /** * auto partition by Long ,can be used in auto increment primary key partition * * @author wuzhi */ public class AutoPartitionByLong extends AbstractPartitionAlgorithm implements RuleAlgorithm{ private String mapFile; private LongRange[] longRongs; private int defaultNode = -1; @Override public void init() { initialize(); } public void setMapFile(String mapFile) { this.mapFile = mapFile; } @Override public Integer calculate(String columnValue) { // columnValue = NumberParseUtil.eliminateQoute(columnValue); try { long value = Long.parseLong(columnValue); Integer rst = null; for (LongRange longRang : this.longRongs) { if (value <= longRang.valueEnd && value >= longRang.valueStart) { return longRang.nodeIndx; } } //數據超過范圍,暫時使用配置的默認節點 if (rst == null && defaultNode >= 0) { return defaultNode; } return rst; } catch (NumberFormatException e){ throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e); } } @Override public Integer[] calculateRange(String beginValue, String endValue) { return AbstractPartitionAlgorithm.calculateSequenceRange(this, beginValue, endValue); } @Override public int getPartitionNum() { // int nPartition = longRongs.length; /* * fix #1284 這里的統計應該統計Range的nodeIndex的distinct總數 */ Set<Integer> distNodeIdxSet = new HashSet<Integer>(); for(LongRange range : longRongs) { distNodeIdxSet.add(range.nodeIndx); } int nPartition = distNodeIdxSet.size(); return nPartition; } private void initialize() { BufferedReader in = null; try { // FileInputStream fin = new FileInputStream(new File(fileMapPath)); InputStream fin = this.getClass().getClassLoader() .getResourceAsStream(mapFile); if (fin == null) { throw new RuntimeException("can't find class resource file " + mapFile); } in = new BufferedReader(new InputStreamReader(fin)); LinkedList<LongRange> longRangeList = new LinkedList<LongRange>(); for (String line = null; (line = in.readLine()) != null;) { line = line.trim(); if (line.startsWith("#") || line.startsWith("//")) { continue; } int ind = line.indexOf('='); if (ind < 0) { System.out.println(" warn: bad line int " + mapFile + " :" + line); continue; } String pairs[] = line.substring(0, ind).trim().split("-"); long longStart = NumberParseUtil.parseLong(pairs[0].trim()); long longEnd = NumberParseUtil.parseLong(pairs[1].trim()); int nodeId = Integer.parseInt(line.substring(ind + 1) .trim()); longRangeList .add(new LongRange(nodeId, longStart, longEnd)); } longRongs = longRangeList.toArray(new LongRange[longRangeList .size()]); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException(e); } } finally { try { in.close(); } catch (Exception e2) { } } } public int getDefaultNode() { return defaultNode; } public void setDefaultNode(int defaultNode) { this.defaultNode = defaultNode; } static class LongRange { public final int nodeIndx; public final long valueStart; public final long valueEnd; public LongRange(int nodeIndx, long valueStart, long valueEnd) { super(); this.nodeIndx = nodeIndx; this.valueStart = valueStart; this.valueEnd = valueEnd; } } }
2. 枚舉分片
把數據分類存儲
在rule.xml里面的配置:
<tableRule name="sharding-by-intfile"> <rule> <columns>sharding_id</columns> <algorithm>hash-int</algorithm> </rule> </tableRule> <function name="hash-int" class="io.mycat.route.function.PartitionByFileMap"> <property name="mapFile">partition-hash-int.txt</property> <property name="defaultNode">0</property> <!-- 找不到分片時設置容錯規則,把數據插入到默認分片0里面 --> </function>
說明:找不到分片時設置容錯規則,把數據插入到默認分片0里面
對應代碼:

package io.mycat.route.function; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import io.mycat.config.model.rule.RuleAlgorithm; /** * * @author mycat */ public class PartitionByFileMap extends AbstractPartitionAlgorithm implements RuleAlgorithm { private String mapFile; private Map<Object, Integer> app2Partition; /** * Map<Object, Integer> app2Partition中key值的類型:默認值為0,0表示Integer,非零表示String */ private int type; /** * 默認節點在map中的key */ private static final String DEFAULT_NODE = "DEFAULT_NODE"; /** * 默認節點:小於0表示不設置默認節點,大於等於0表示設置默認節點 * * 默認節點的作用:枚舉分片時,如果碰到不識別的枚舉值,就讓它路由到默認節點 * 如果不配置默認節點(defaultNode值小於0表示不配置默認節點),碰到 * 不識別的枚舉值就會報錯, * like this:can't find datanode for sharding column:column_name val:ffffffff */ private int defaultNode = -1; @Override public void init() { initialize(); } public void setMapFile(String mapFile) { this.mapFile = mapFile; } public void setType(int type) { this.type = type; } public void setDefaultNode(int defaultNode) { this.defaultNode = defaultNode; } @Override public Integer calculate(String columnValue) { try { Object value = columnValue; if (type == 0) { value = Integer.valueOf(columnValue); } Integer rst = null; Integer pid = app2Partition.get(value); if (pid != null) { rst = pid; } else { rst = app2Partition.get(DEFAULT_NODE); } return rst; } catch (NumberFormatException e){ throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e); } } @Override public int getPartitionNum() { Set<Integer> set = new HashSet<Integer>(app2Partition.values()); int count = set.size(); return count; } private void initialize() { BufferedReader in = null; try { // FileInputStream fin = new FileInputStream(new File(fileMapPath)); InputStream fin = this.getClass().getClassLoader() .getResourceAsStream(mapFile); if (fin == null) { throw new RuntimeException("can't find class resource file " + mapFile); } in = new BufferedReader(new InputStreamReader(fin)); app2Partition = new HashMap<Object, Integer>(); for (String line = null; (line = in.readLine()) != null;) { line = line.trim(); if (line.startsWith("#") || line.startsWith("//")) { continue; } int ind = line.indexOf('='); if (ind < 0) { continue; } try { String key = line.substring(0, ind).trim(); int pid = Integer.parseInt(line.substring(ind + 1).trim()); if(type == 0) { app2Partition.put(Integer.parseInt(key), pid); } else { app2Partition.put(key, pid); } } catch (Exception e) { } } //設置默認節點 if(defaultNode >= 0) { app2Partition.put(DEFAULT_NODE, defaultNode); } } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException(e); } } finally { try { in.close(); } catch (Exception e2) { } } } }
3. Hash固定分片(固定分片Hash算法)
固定分片Hash算法,最多有1024個分片
在rule.xml里面的配置:
<tableRule name="rule1"> <!-- rule標簽:策略定義標簽 --> <rule> <!-- columns標簽:對應的分片字段 --> <columns>id</columns> <!-- algorithm標簽:tableRule分片策略對應的function名稱 --> <algorithm>func1</algorithm> </rule> </tableRule> <function name="func1" class="io.mycat.route.function.PartitionByLong"> <property name="partitionCount">1,1,2,3,1</property><!-- 分片數 --> <property name="partitionLength">128,128,128,128,128</property><!-- 分片長度 --> </function>
說明:
1) partitionCount.length必須等於partitionLength.length
2) sum((partitionCount[i]*partitionLength[j])) === 1024——>partitionCount[0]*partitionLength[0]+partitionCount[1]*partitionLength[1] === 1024
即:1*128+1*128+2*128+3*128+1*128 === 1024
eg:
8個分片表
第1個分片表的下標為0: 0-127
第2個分片表的下標為1: 127-255
...................
第8個分片表的下標為7: 896-1024
如何確定落在哪個分片上呢?分片id的值與1024取余可確定在哪個分片上:
如id%1024 = 128 則落在第2個分片上
對應代碼:

package io.mycat.route.function; import io.mycat.config.model.rule.RuleAlgorithm; import io.mycat.route.util.PartitionUtil; public final class PartitionByLong extends AbstractPartitionAlgorithm implements RuleAlgorithm { protected int[] count; protected int[] length; protected PartitionUtil partitionUtil; private static int[] toIntArray(String string) { String[] strs = io.mycat.util.SplitUtil.split(string, ',', true); int[] ints = new int[strs.length]; for (int i = 0; i < strs.length; ++i) { ints[i] = Integer.parseInt(strs[i]); } return ints; } public void setPartitionCount(String partitionCount) { this.count = toIntArray(partitionCount); } public void setPartitionLength(String partitionLength) { this.length = toIntArray(partitionLength); } @Override public void init() { partitionUtil = new PartitionUtil(count, length); } @Override public Integer calculate(String columnValue) { // columnValue = NumberParseUtil.eliminateQoute(columnValue); try { long key = Long.parseLong(columnValue); return partitionUtil.partition(key); } catch (NumberFormatException e){ throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e); } } @Override public Integer[] calculateRange(String beginValue, String endValue) { return AbstractPartitionAlgorithm.calculateSequenceRange(this, beginValue, endValue); } // @Override // public int getPartitionCount() { // int nPartition = 0; // for(int i = 0; i < count.length; i++) { // nPartition += count[i]; // } // return nPartition; // } }
4. 求模分片
分片字段id%分片數=分片下標
在rule.xml里面的配置:
<tableRule name="mod-long"> <rule> <columns>id</columns><!--分片字段 --> <algorithm>mod-long</algorithm> </rule> </tableRule> <function name="mod-long" class="io.mycat.route.function.PartitionByMod"> <!-- how many data nodes --> <property name="count">3</property><!--分片數 --> </function>
對應代碼:

package io.mycat.route.function; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import io.mycat.config.model.rule.RuleAlgorithm; /** * number column partion by Mod operator * if count is 10 then 0 to 0,21 to 1 (21 % 10 =1) * @author wuzhih * */ public class PartitionByMod extends AbstractPartitionAlgorithm implements RuleAlgorithm { private int count; @Override public void init() { } public void setCount(int count) { this.count = count; } @Override public Integer calculate(String columnValue) { // columnValue = NumberParseUtil.eliminateQoute(columnValue); try { BigInteger bigNum = new BigInteger(columnValue).abs(); return (bigNum.mod(BigInteger.valueOf(count))).intValue(); } catch (NumberFormatException e){ throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e); } } @Override public int getPartitionNum() { int nPartition = this.count; return nPartition; } private static void hashTest() { PartitionByMod hash=new PartitionByMod(); hash.setCount(11); hash.init(); int[] bucket=new int[hash.count]; Map<Integer,List<Integer>> hashed=new HashMap<>(); int total=1000_0000;//數據量 int c=0; for(int i=100_0000;i<total+100_0000;i++){//假設分片鍵從100萬開始 c++; int h=hash.calculate(Integer.toString(i)); bucket[h]++; List<Integer> list=hashed.get(h); if(list==null){ list=new ArrayList<>(); hashed.put(h, list); } list.add(i); } System.out.println(c+" "+total); double d=0; c=0; int idx=0; System.out.println("index bucket ratio"); for(int i:bucket){ d+=i/(double)total; c+=i; System.out.println(idx+++" "+i+" "+(i/(double)total)); } System.out.println(d+" "+c); System.out.println("****************************************************"); rehashTest(hashed.get(0)); } private static void rehashTest(List<Integer> partition) { PartitionByMod hash=new PartitionByMod(); hash.count=110;//分片數 hash.init(); int[] bucket=new int[hash.count]; int total=partition.size();//數據量 int c=0; for(int i:partition){//假設分片鍵從100萬開始 c++; int h=hash.calculate(Integer.toString(i)); bucket[h]++; } System.out.println(c+" "+total); c=0; int idx=0; System.out.println("index bucket ratio"); for(int i:bucket){ c+=i; System.out.println(idx+++" "+i+" "+(i/(double)total)); } } public static void main(String[] args) { // hashTest(); PartitionByMod partitionByMod = new PartitionByMod(); partitionByMod.count=8; partitionByMod.calculate("\"6\""); partitionByMod.calculate("\'6\'"); } }
5. 自然月分片
按照自然月的方式進行分片
在rule.xml里面的配置:
<tableRule name="sharding-by-month"> <rule> <columns>create_time</columns> <algorithm>partbymonth</algorithm> </rule> </tableRule> <function name="partbymonth" class="io.mycat.route.function.PartitionByMonth"> <property name="dateFormat">yyyy-MM-dd</property> <property name="sBeginDate">2015-01-01</property> </function>
說明:
如果月份超過了分片數,則通過設置sEndDated的值來解決
如有3個分片,分別插入2015-01-12,2015-02-12、2015-03-12、2016-11-12,月份超過了分片數,此時設置sEndDated= 2015-04-12表示4個月放一個分片,如下可知2016-11-12在分片2上
分片0 |
分片1 |
分片2 |
1月 |
5月 |
9月 |
2月 |
6月 |
10月 |
3月 |
7月 |
11月 |
4月 |
8月 |
12月 |
對應代碼:

package io.mycat.route.function; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; import java.util.List; import io.mycat.config.model.rule.RuleAlgorithm; import org.apache.log4j.Logger; /** * 例子 按月份列分區 ,每個自然月一個分片,格式 between操作解析的范例 * * @author wzh * */ public class PartitionByMonth extends AbstractPartitionAlgorithm implements RuleAlgorithm { private static final Logger LOGGER = Logger.getLogger(PartitionByDate.class); private String sBeginDate; private String dateFormat; private String sEndDate; private Calendar beginDate; private Calendar endDate; private int nPartition; private ThreadLocal<SimpleDateFormat> formatter; @Override public void init() { try { beginDate = Calendar.getInstance(); beginDate.setTime(new SimpleDateFormat(dateFormat) .parse(sBeginDate)); formatter = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(dateFormat); } }; if(sEndDate!=null&&!sEndDate.equals("")) { endDate = Calendar.getInstance(); endDate.setTime(new SimpleDateFormat(dateFormat).parse(sEndDate)); nPartition = ((endDate.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR)) * 12 + endDate.get(Calendar.MONTH) - beginDate.get(Calendar.MONTH)) + 1; if (nPartition <= 0) { throw new java.lang.IllegalArgumentException("Incorrect time range for month partitioning!"); } } else { nPartition = -1; } } catch (ParseException e) { throw new java.lang.IllegalArgumentException(e); } } /** * For circulatory partition, calculated value of target partition needs to be * rotated to fit the partition range */ private int reCalculatePartition(int targetPartition) { /** * If target date is previous of start time of partition setting, shift * the delta range between target and start date to be positive value */ if (targetPartition < 0) { targetPartition = nPartition - (-targetPartition) % nPartition; } if (targetPartition >= nPartition) { targetPartition = targetPartition % nPartition; } return targetPartition; } @Override public Integer calculate(String columnValue) { try { int targetPartition; Calendar curTime = Calendar.getInstance(); curTime.setTime(formatter.get().parse(columnValue)); targetPartition = ((curTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR)) * 12 + curTime.get(Calendar.MONTH) - beginDate.get(Calendar.MONTH)); /** * For circulatory partition, calculated value of target partition needs to be * rotated to fit the partition range */ if (nPartition > 0) { targetPartition = reCalculatePartition(targetPartition); } return targetPartition; } catch (ParseException e) { throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e); } } @Override public Integer[] calculateRange(String beginValue, String endValue) { try { int startPartition, endPartition; Calendar partitionTime = Calendar.getInstance(); SimpleDateFormat format = new SimpleDateFormat(dateFormat); partitionTime.setTime(format.parse(beginValue)); startPartition = ((partitionTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR)) * 12 + partitionTime.get(Calendar.MONTH) - beginDate.get(Calendar.MONTH)); partitionTime.setTime(format.parse(endValue)); endPartition = ((partitionTime.get(Calendar.YEAR) - beginDate.get(Calendar.YEAR)) * 12 + partitionTime.get(Calendar.MONTH) - beginDate.get(Calendar.MONTH)); List<Integer> list = new ArrayList<>(); while (startPartition <= endPartition) { Integer nodeValue = reCalculatePartition(startPartition); if (Collections.frequency(list, nodeValue) < 1) list.add(nodeValue); startPartition++; } int size = list.size(); return (list.toArray(new Integer[size])); } catch (ParseException e) { LOGGER.error("error",e); return new Integer[0]; } } @Override public int getPartitionNum() { int nPartition = this.nPartition; return nPartition; } public void setsBeginDate(String sBeginDate) { this.sBeginDate = sBeginDate; } public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; } public void setsEndDate(String sEndDate) { this.sEndDate = sEndDate; } }
6. 匹配求模分片
根據prefixLength截取n個字符並charAt(i)每個字符的值進行累加得到一個整數,然后和分區長度patternValue進行求模,得出的值就是分區編號。
在rule.xml里面的配置:
<tableRule name="partitionbyprefixpattern"> <rule> <columns>id</columns> <algorithm>partitionbyprefixpattern</algorithm> </rule> </tableRule> <function name="partitionbyprefixpattern" class="io.mycat.route.function.PartitionByPrefixPattern"> <property name="patternValue">3</property> <!-- 分區長度/分區數量 --> <property name="prefixLength">6</property> <!-- 截取多少字符串 --> </function>
說明:
有下面這種類型的數據
年月+大區+流水編號
201801 01 10001
就可以采用匹配求模分片,把分片字段columns取前6個字符串201801並charAt(i)每個字符的值進行累加得到一個整數,然后和分區長度3進行求模,得出的值就是分區編號
對應代碼:

package io.mycat.route.function; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.HashSet; import java.util.LinkedList; import java.util.Set; import io.mycat.config.model.rule.RuleAlgorithm; import io.mycat.route.function.AutoPartitionByLong.LongRange; /** * partition by Prefix length ,can be used in String partition * * @author hexiaobin */ public class PartitionByPrefixPattern extends AbstractPartitionAlgorithm implements RuleAlgorithm { private static final int PARTITION_LENGTH = 1024; private int patternValue = PARTITION_LENGTH;// 分區長度,取模數值(默認為1024) private int prefixLength;// 字符前幾位進行ASCII碼取和 private String mapFile; private LongRange[] longRongs; @Override public void init() { initialize(); } public void setMapFile(String mapFile) { this.mapFile = mapFile; } public void setPatternValue(int patternValue) { this.patternValue = patternValue; } public void setPrefixLength(int prefixLength) { this.prefixLength = prefixLength; } @Override public Integer calculate(String columnValue) { try { int Length = Integer.valueOf(prefixLength); Length = columnValue.length() < Length ? columnValue.length() : Length; int sum = 0; for (int i = 0; i < Length; i++) { sum = sum + columnValue.charAt(i); } Integer rst = null; for (LongRange longRang : this.longRongs) { long hash = sum % patternValue; if (hash <= longRang.valueEnd && hash >= longRang.valueStart) { return longRang.nodeIndx; } } return rst; } catch (NumberFormatException e){ throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please eliminate any quote and non number within it.").toString(),e); } } @Override public int getPartitionNum() { // int nPartition = this.longRongs.length; /* * fix #1284 這里的統計應該統計Range的nodeIndex的distinct總數 */ Set<Integer> distNodeIdxSet = new HashSet<Integer>(); for(LongRange range : longRongs) { distNodeIdxSet.add(range.nodeIndx); } int nPartition = distNodeIdxSet.size(); return nPartition; } private void initialize() { BufferedReader in = null; try { // FileInputStream fin = new FileInputStream(new File(fileMapPath)); InputStream fin = this.getClass().getClassLoader() .getResourceAsStream(mapFile); if (fin == null) { throw new RuntimeException("can't find class resource file " + mapFile); } in = new BufferedReader(new InputStreamReader(fin)); LinkedList<LongRange> longRangeList = new LinkedList<LongRange>(); for (String line = null; (line = in.readLine()) != null;) { line = line.trim(); if (line.startsWith("#") || line.startsWith("//")) { continue; } int ind = line.indexOf('='); if (ind < 0) { System.out.println(" warn: bad line int " + mapFile + " :" + line); continue; } String pairs[] = line.substring(0, ind).trim().split("-"); long longStart = NumberParseUtil.parseLong(pairs[0].trim()); long longEnd = NumberParseUtil.parseLong(pairs[1].trim()); int nodeId = Integer.parseInt(line.substring(ind + 1) .trim()); longRangeList .add(new LongRange(nodeId, longStart, longEnd)); } longRongs = longRangeList.toArray(new LongRange[longRangeList .size()]); } catch (Exception e) { if (e instanceof RuntimeException) { throw (RuntimeException) e; } else { throw new RuntimeException(e); } } finally { try { in.close(); } catch (Exception e2) { } } } static class LongRange { public final int nodeIndx; public final long valueStart; public final long valueEnd; public LongRange(int nodeIndx, long valueStart, long valueEnd) { super(); this.nodeIndx = nodeIndx; this.valueStart = valueStart; this.valueEnd = valueEnd; } } }
7. 冷熱數據分片
根據日期查詢日志數據冷熱數據分布 ,最近 n 個月的到實時交易庫查詢,超過 n 個月的按照 m 天分片。
在rule.xml里面的配置:
<tableRule name="sharding-by-date"> <rule> <columns>create_time</columns> <algorithm>sharding-by-hotdate</algorithm> </rule> </tableRule> <function name="sharding-by-hotdate" class="org.opencloudb.route.function.PartitionByHotDate"> <property name="dateFormat">yyyy-MM-dd</property> <!-- 定義日期格式 --> <property name="sLastDay">30</property> <!-- 熱庫存儲多少天數據 --> <property name="sPartionDay">30</property> <!-- 超過熱庫期限的數據按照多少天來分片 --> </function>
對應代碼:

package io.mycat.route.function; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.mycat.config.model.rule.RuleAlgorithm; /** * 根據日期查詢日志數據 冷熱數據分布 ,最近n個月的到實時交易庫查詢,超過n個月的按照m天分片 * * @author sw * * <tableRule name="sharding-by-date"> <rule> <columns>create_time</columns> <algorithm>sharding-by-hotdate</algorithm> </rule> </tableRule> <function name="sharding-by-hotdate" class="org.opencloudb.route.function.PartitionByHotDate"> <property name="dateFormat">yyyy-MM-dd</property> <property name="sLastDay">10</property> <property name="sPartionDay">30</property> </function> */ public class PartitionByHotDate extends AbstractPartitionAlgorithm implements RuleAlgorithm { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionByHotDate.class); private String dateFormat; private String sLastDay; private String sPartionDay; private long sLastTime; private long partionTime; private ThreadLocal<SimpleDateFormat> formatter; private long beginDate; private static final long oneDay = 86400000; @Override public void init() { try { formatter = new ThreadLocal<SimpleDateFormat>() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat(dateFormat); } }; sLastTime = Integer.valueOf(sLastDay); partionTime = Integer.parseInt(sPartionDay) * oneDay; } catch (Exception e) { throw new java.lang.IllegalArgumentException(e); } } @Override public Integer calculate(String columnValue) { Integer targetPartition = -1; try { long targetTime = formatter.get().parse(columnValue).getTime(); Calendar now = Calendar.getInstance(); long nowTime = now.getTimeInMillis(); beginDate = nowTime - sLastTime * oneDay; long diffDays = (nowTime - targetTime) / (1000 * 60 * 60 * 24) + 1; if(diffDays-sLastTime <= 0 || diffDays<0 ){ targetPartition = 0; }else{ targetPartition = (int) ((beginDate - targetTime) / partionTime) + 1; } LOGGER.debug("PartitionByHotDate calculate for " + columnValue + " return " + targetPartition); return targetPartition; } catch (ParseException e) { throw new IllegalArgumentException(new StringBuilder().append("columnValue:").append(columnValue).append(" Please check if the format satisfied.").toString(),e); } } @Override public Integer[] calculateRange(String beginValue, String endValue) { Integer[] targetPartition = null; try { long startTime = formatter.get().parse(beginValue).getTime(); long endTime = formatter.get().parse(endValue).getTime(); Calendar now = Calendar.getInstance(); long nowTime = now.getTimeInMillis(); long limitDate = nowTime - sLastTime * oneDay; long diffDays = (nowTime - startTime) / (1000 * 60 * 60 * 24) + 1; if(diffDays-sLastTime <= 0 || diffDays<0 ){ Integer [] re = new Integer[1]; re[0] = 0; targetPartition = re ; }else{ Integer [] re = null; Integer begin = 0, end = 0; end = this.calculate(beginValue); boolean hasLimit = false; if(endTime-limitDate > 0){ endTime = limitDate; hasLimit = true; } begin = this.calculate(formatter.get().format(endTime)); if(begin == null || end == null){ return re; } if (end >= begin) { int len = end-begin+1; if(hasLimit){ re = new Integer[len+1]; re[0] = 0; for(int i =0;i<len;i++){ re[i+1]=begin+i; } }else{ re = new Integer[len]; for(int i=0;i<len;i++){ re[i]=begin+i; } } return re; }else{ return re; } } } catch (ParseException e) { throw new IllegalArgumentException(new StringBuilder().append("endValue:").append(endValue).append(" Please check if the format satisfied.").toString(),e); } return targetPartition; } public void setsPartionDay(String sPartionDay) { this.sPartionDay = sPartionDay; } public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; } public String getsLastDay() { return sLastDay; } public void setsLastDay(String sLastDay) { this.sLastDay = sLastDay; } }
8. 一致性哈希分片
1)首先求出mysql服務器(節點)的哈希值,並將其配置到0~2^32的圓(continuum)上。
2)為每台mysql服務器物理節點虛擬出多個虛擬節點,並計算hash值映射到相同的圓上。
3)然后從數據映射到的mysql服務器虛擬節點的位置開始順時針查找,將數據保存到找到的第一個mysql服務器上。如果超過232仍然找不到服務器,就會保存到第一台mysql服務器上。
特點:解決數據均勻分布
在rule.xml里面的配置:
<tableRule name="sharding-by-murmur"> <rule> <columns>id</columns> <algorithm>murmur</algorithm> </rule> </tableRule> <function name="murmur" class="io.mycat.route.function.PartitionByMurmurHash"> <property name="seed">0</property><!-- 默認是0 --> <property name="count">2</property><!-- 要分片的數據庫節點數量,必須指定,否則沒法分片 --> <property name="virtualBucketTimes">160</property><!-- 一個實際的數據庫節點被映射為這么多虛擬節點,默認是160倍,也就是虛擬節點數是物理節點數的160倍 --> <!-- <property name="weightMapFile">weightMapFile</property> 節點的權重,沒有指定權重的節點默認是1。以properties文件的格式填寫,以從0開始到count-1的整數值也就是節點索引為key,以節點權重值為值。所有權重值必須是正整數,否則以1代替 --> <!-- <property name="bucketMapPath">/etc/mycat/bucketMapPath</property> 用於測試時觀察各物理節點與虛擬節點的分布情況,如果指定了這個屬性,會把虛擬節點的murmur hash值與物理節點的映射按行輸出到這個文件,沒有默認值,如果不指定,就不會輸出任何東西 --> </function>
對應代碼:

package io.mycat.route.function; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.SortedMap; import java.util.TreeMap; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import io.mycat.config.model.rule.RuleAlgorithm; import io.mycat.util.exception.MurmurHashException; /** * consistancy hash, murmur hash * implemented by Guava * @author wuzhih * */ public class PartitionByMurmurHash extends AbstractPartitionAlgorithm implements RuleAlgorithm { private static final int DEFAULT_VIRTUAL_BUCKET_TIMES=160; private static final int DEFAULT_WEIGHT=1; private static final Charset DEFAULT_CHARSET=Charset.forName("UTF-8"); private int seed; private int count; private int virtualBucketTimes=DEFAULT_VIRTUAL_BUCKET_TIMES; private Map<Integer,Integer> weightMap=new HashMap<>(); // private String bucketMapPath; private HashFunction hash; private SortedMap<Integer,Integer> bucketMap; @Override public void init() { try{ bucketMap=new TreeMap<>(); // boolean serializableBucketMap=bucketMapPath!=null && bucketMapPath.length()>0; // if(serializableBucketMap){ // File bucketMapFile=new File(bucketMapPath); // if(bucketMapFile.exists() && bucketMapFile.length()>0){ // loadBucketMapFile(); // return; // } // } generateBucketMap(); // if(serializableBucketMap){ // storeBucketMap(); // } }catch(Exception e){ throw new MurmurHashException(e); } } private void generateBucketMap(){ hash=Hashing.murmur3_32(seed);//計算一致性哈希的對象 for(int i=0;i<count;i++){//構造一致性哈希環,用TreeMap表示 StringBuilder hashName=new StringBuilder("SHARD-").append(i); for(int n=0,shard=virtualBucketTimes*getWeight(i);n<shard;n++){ bucketMap.put(hash.hashUnencodedChars(hashName.append("-NODE-").append(n)).asInt(),i); } } weightMap=null; } // private void storeBucketMap() throws IOException{ // try(OutputStream store=new FileOutputStream(bucketMapPath)){ // Properties props=new Properties(); // for(Map.Entry entry:bucketMap.entrySet()){ // props.setProperty(entry.getKey().toString(), entry.getValue().toString()); // } // props.store(store,null); // } // } // private void loadBucketMapFile() throws FileNotFoundException, IOException{ // try(InputStream in=new FileInputStream(bucketMapPath)){ // Properties props=new Properties(); // props.load(in); // for(Map.Entry entry:props.entrySet()){ // bucketMap.put(Integer.parseInt(entry.getKey().toString()), Integer.parseInt(entry.getValue().toString())); // } // } // } /** * 得到桶的權重,桶就是實際存儲數據的DB實例 * 從0開始的桶編號為key,權重為值,權重默認為1。 * 鍵值必須都是整數 * @param bucket * @return */ private int getWeight(int bucket){ Integer w=weightMap.get(bucket); if(w==null){ w=DEFAULT_WEIGHT; } return w; } /** * 創建murmur_hash對象的種子,默認0 * @param seed */ public void setSeed(int seed){ this.seed=seed; } /** * 節點的數量 * @param count */ public void setCount(int count) { this.count = count; } /** * 虛擬節點倍數,virtualBucketTimes*count就是虛擬結點數量 * @param virtualBucketTimes */ public void setVirtualBucketTimes(int virtualBucketTimes){ this.virtualBucketTimes=virtualBucketTimes; } /** * 節點的權重,沒有指定權重的節點默認是1。以properties文件的格式填寫,以從0開始到count-1的整數值也就是節點索引為key,以節點權重值為值。 * 所有權重值必須是正整數,否則以1代替 * @param weightMapPath * @throws IOException * @throws */ public void setWeightMapFile(String weightMapPath) throws IOException{ Properties props=new Properties(); try(BufferedReader reader=new BufferedReader(new InputStreamReader(this.getClass().getClassLoader().getResourceAsStream(weightMapPath), DEFAULT_CHARSET))){ props.load(reader); for(Map.Entry entry:props.entrySet()){ int weight=Integer.parseInt(entry.getValue().toString()); weightMap.put(Integer.parseInt(entry.getKey().toString()), weight>0?weight:1); } } } // /** // * 保存一致性hash的虛擬節點文件路徑。 // * 如果這個文件不存在或是空文件就按照指定的count, weightMapFile等構造新的MurmurHash數據結構並保存到這個路徑的文件里。 // * 如果這個文件已存在且不是空文件就加載這個文件里的內容作為MurmurHash數據結構,此時其它參數都忽略。 // * 除第一次以外在之后增加節點時可以直接修改這個文件,不過不推薦這么做。如果節點數量變化了,推薦刪除這個文件。 // * 可以不指定這個路徑,不指定路徑時不會保存murmur hash // * @param bucketMapPath // */ // public void setBucketMapPath(String bucketMapPath){ // this.bucketMapPath=bucketMapPath; // } @Override public Integer calculate(String columnValue) { SortedMap<Integer, Integer> tail = bucketMap.tailMap(hash.hashUnencodedChars(columnValue).asInt()); if (tail.isEmpty()) { return bucketMap.get(bucketMap.firstKey()); } return tail.get(tail.firstKey()); } @Override public int getPartitionNum() { int nPartition = this.count; return nPartition; } private static void hashTest() throws IOException{ PartitionByMurmurHash hash=new PartitionByMurmurHash(); hash.count=10;//分片數 hash.init(); int[] bucket=new int[hash.count]; Map<Integer,List<Integer>> hashed=new HashMap<>(); int total=1000_0000;//數據量 int c=0; for(int i=100_0000;i<total+100_0000;i++){//假設分片鍵從100萬開始 c++; int h=hash.calculate(Integer.toString(i)); bucket[h]++; List<Integer> list=hashed.get(h); if(list==null){ list=new ArrayList<>(); hashed.put(h, list); } list.add(i); } System.out.println(c+" "+total); double d=0; c=0; int idx=0; System.out.println("index bucket ratio"); for(int i:bucket){ d+=i/(double)total; c+=i; System.out.println(idx+++" "+i+" "+(i/(double)total)); } System.out.println(d+" "+c); Properties props=new Properties(); for(Map.Entry entry:hash.bucketMap.entrySet()){ props.setProperty(entry.getKey().toString(), entry.getValue().toString()); } ByteArrayOutputStream out=new ByteArrayOutputStream(); props.store(out, null); props.clear(); props.load(new ByteArrayInputStream(out.toByteArray())); System.out.println(props); System.out.println("****************************************************"); // rehashTest(hashed.get(0)); } private static void rehashTest(List<Integer> partition){ PartitionByMurmurHash hash=new PartitionByMurmurHash(); hash.count=12;//分片數 hash.init(); int[] bucket=new int[hash.count]; int total=partition.size();//數據量 int c=0; for(int i:partition){//假設分片鍵從100萬開始 c++; int h=hash.calculate(Integer.toString(i)); bucket[h]++; } System.out.println(c+" "+total); c=0; int idx=0; System.out.println("index bucket ratio"); for(int i:bucket){ c+=i; System.out.println(idx+++" "+i+" "+(i/(double)total)); } } public static void main(String[] args) throws IOException { hashTest(); } }