TDDL大家應該很熟悉了,淘寶分布式數據層。很好的為我們實現了分庫分表、Master/Salve、動態數據源配置等功能。
那么分布式之后,數據庫自增序列肯定用不了了,如何方便快捷的解決這個問題呢?TDDL也提供了SEQUENCE的解決方案。
總述
在數據庫中創建 sequence 表,用於記錄,當前已被占用的id最大值。
每台客戶端主機取一個id區間(比如 1000~2000)緩存在本地,並更新 sequence 表中的id最大值記錄。
客戶端主機之間取不同的id區間,用完再取,使用樂觀鎖機制控制並發。
第一步:創建一張sequence對應的表
CREATE TABLE `imp_sequence` (
`BIZ_NAME` varchar(45) NOT NULL COMMENT '業務名稱',
`CURRENT_VALUE` int(11) NOT NULL COMMENT '當前最大值',
`GMT_CREATE` datetime DEFAULT NULL COMMENT '創建時間',
`GMT_MODIFIED` datetime DEFAULT NULL COMMENT '修改時間',
PRIMARY KEY (`BIZ_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='數據序列表';
表名和字段可以按各自規則定義,定義之后需要與第二步DAO中的定義相對應!
幾張邏輯表需要聲明幾個sequence。
第二步:配置sequenceDao

<bean id="sequenceDao" class="com.taobao.tddl.client.sequence.impl.DefaultSequenceDao">
<!-- 數據源 -->
<property name="dataSource" ref="dataSource" />
<!-- 步長-->
<property name="step" value="1000" />
<!-- 重試次數-->
<property name="retryTimes" value="1" />
<!-- sequence 表名-->
<property name="tableName" value="gt_sequence" />
<!-- sequence 名稱-->
<property name="nameColumnName" value="BIZ_NAME" />
<!-- sequence 當前值-->
<property name="valueColumnName" value="CURRENT_VALUE" />
<!-- sequence 更新時間-->
<property name="gmtModifiedColumnName" value="gmt_modified" />
</bean>
第三步:配置sequence生成器

<bean id="businessSequence" class="com.taobao.tddl.client.sequence.impl.DefaultSequence">
<property name="sequenceDao" ref="sequenceDao"/>
<property name="name" value="business_sequence" />
</bean>
第四步:調用

public class IbatisSmDAO extends SqlMapClientDaoSupport implements SmDAO { /**smSequence*/ private DefaultSequence businessSequence; public int insert(SmDO sm) throws DataAccessException { if (sm == null) { throw new IllegalArgumentException("Can't insert a null data object into db."); } try { sm.setId((int)businessSequence.nextValue()); } catch (SequenceException e) { throw new RuntimeException("Can't get primary key."); } getSqlMapClientTemplate().insert("MS-SM-INSERT", sm); return sm.getId(); } }
從調用配置中,我們可以發現其中涉及到二個重要類DefaultSequenceDao和DefaultSequence,這二個都是TDDL的默認實現。DefaultSequenceDao:序列DAO默認實現,JDBC方式。DefaultSequence:序列默認實現。
先來看DefaultSequenceDao,TDDL中提供了默認的表名,列名和步長等,第一步的建表可以參照默認方式。

private static final int MIN_STEP = 1; private static final int MAX_STEP = 100000; private static final int DEFAULT_STEP = 1000; private static final int DEFAULT_RETRY_TIMES = 150; private static final String DEFAULT_TABLE_NAME = "sequence"; private static final String DEFAULT_NAME_COLUMN_NAME = "name"; private static final String DEFAULT_VALUE_COLUMN_NAME = "value"; private static final String DEFAULT_GMT_MODIFIED_COLUMN_NAME = "gmt_modified"; private static final long DELTA = 100000000L; private DataSource dataSource; /** * 重試次數 */ private int retryTimes = DEFAULT_RETRY_TIMES; /** * 步長 */ private int step = DEFAULT_STEP; /** * 序列所在的表名 */ private String tableName = DEFAULT_TABLE_NAME; /** * 存儲序列名稱的列名 */ private String nameColumnName = DEFAULT_NAME_COLUMN_NAME; /** * 存儲序列值的列名 */ private String valueColumnName = DEFAULT_VALUE_COLUMN_NAME; /** * 存儲序列最后更新時間的列名 */ private String gmtModifiedColumnName = DEFAULT_GMT_MODIFIED_COLUMN_NAME;
接下來看一下nextRange方法:取得下一個可用的序列區間:

public SequenceRange nextRange(String name) throws SequenceException { if (name == null) { throw new IllegalArgumentException("序列名稱不能為空"); } long oldValue; long newValue; Connection conn = null; PreparedStatement stmt = null; ResultSet rs = null; for (int i = 0; i < retryTimes + 1; ++i) { try { conn = dataSource.getConnection(); stmt = conn.prepareStatement(getSelectSql()); stmt.setString(1, name); rs = stmt.executeQuery(); rs.next(); oldValue = rs.getLong(1); if (oldValue < 0) { StringBuilder message = new StringBuilder(); message.append("Sequence value cannot be less than zero, value = ").append(oldValue); message.append(", please check table ").append(getTableName()); throw new SequenceException(message.toString()); } if (oldValue > Long.MAX_VALUE - DELTA) { StringBuilder message = new StringBuilder(); message.append("Sequence value overflow, value = ").append(oldValue); message.append(", please check table ").append(getTableName()); throw new SequenceException(message.toString()); } newValue = oldValue + getStep(); } catch (SQLException e) { throw new SequenceException(e); } finally { closeResultSet(rs); rs = null; closeStatement(stmt); stmt = null; closeConnection(conn); conn = null; } try { conn = dataSource.getConnection(); stmt = conn.prepareStatement(getUpdateSql()); stmt.setLong(1, newValue); stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis())); stmt.setString(3, name); stmt.setLong(4, oldValue); int affectedRows = stmt.executeUpdate(); if (affectedRows == 0) { // retry continue; } return new SequenceRange(oldValue + 1, newValue); } catch (SQLException e) { throw new SequenceException(e); } finally { closeStatement(stmt); stmt = null; closeConnection(conn); conn = null; } } throw new SequenceException("Retried too many times, retryTimes = " + retryTimes); }
通過getSelectSql查詢最新的value值,然后加上步點,通過getUpdateSql更新到數據庫中

private String getSelectSql() { if (selectSql == null) { synchronized (this) { if (selectSql == null) { StringBuilder buffer = new StringBuilder(); buffer.append("select ").append(getValueColumnName()); buffer.append(" from ").append(getTableName()); buffer.append(" where ").append(getNameColumnName()).append(" = ?"); selectSql = buffer.toString(); } } } return selectSql; } private String getUpdateSql() { if (updateSql == null) { synchronized (this) { if (updateSql == null) { StringBuilder buffer = new StringBuilder(); buffer.append("update ").append(getTableName()); buffer.append(" set ").append(getValueColumnName()).append(" = ?, "); buffer.append(getGmtModifiedColumnName()).append(" = ? where "); buffer.append(getNameColumnName()).append(" = ? and "); buffer.append(getValueColumnName()).append(" = ?"); updateSql = buffer.toString(); } } } return updateSql; }
有一個特殊需要說明的,在update語句中,where需要把之前的value當成條件傳入。實現了類型version的樂觀鎖操作。如果同一個時間AB二台機器同時請求獲取到相同的value,進行update操作只有可能一條成功。失敗的會按retryTimes進行重試。
接下來看DefaultSequence,比較簡單,就不說明了

public class DefaultSequence implements Sequence { private final Lock lock = new ReentrantLock(); private SequenceDao sequenceDao; /** * 序列名稱 */ private String name; private volatile SequenceRange currentRange; public long nextValue() throws SequenceException { if (currentRange == null) { lock.lock(); try { if (currentRange == null) { currentRange = sequenceDao.nextRange(name); } } finally { lock.unlock(); } } long value = currentRange.getAndIncrement(); if (value == -1) { lock.lock(); try { for (;;) { if (currentRange.isOver()) { currentRange = sequenceDao.nextRange(name); } value = currentRange.getAndIncrement(); if (value == -1) { continue; } break; } } finally { lock.unlock(); } } if (value < 0) { throw new SequenceException("Sequence value overflow, value = " + value); } return value; } public SequenceDao getSequenceDao() { return sequenceDao; } public void setSequenceDao(SequenceDao sequenceDao) { this.sequenceDao = sequenceDao; } public String getName() { return name; } public void setName(String name) { this.name = name; } }