使用Redis實現高並發分布式序列號生成服務


序列號的構成

為建立良好的數據治理方案,作數據掌握、分析、統計、商業智能等用途,業務數據的編碼制定通常都會遵循一定的規則,一般來講,都會有自己的編碼規則和自增序列構成。比如我們常見的身份證號、銀行卡號、社保電腦號等等。

以某公司產品標識碼(代表該產品的唯一編碼)的構成為例:

規則定義:商品款號(8位)+顏色號(3位)+號型碼(3位) (共14位)

其標識碼為:62X19001 001 46A 

業務含義為: 2009年男裝秋冬季仿毛套西黑色170A版 

簡單來講,業務編碼是由規則序列構成,規則是允許定義和編輯的,序列通常要求並發安全。整個序列號生成規則要求讀寫並發安全。

序列號生成方案

由於Redis的高性能,高並發和數據一致性的保證,以及斷電數據不丟失,分布式擴展能力等優勢。我們采用Redis存儲並持久化序列和業務規則來配置和管理整個序列號的產生。

規則定義舉例:前綴+時間(YYYYMMDD)+所使用的序列(指定長度),那么產生的序列號類似於SO20150520000124

具體規則可根據實際業務需求來設計。

實現要求:整個生成過程使用Jedis完成,保證原子事務。並通過壓力測試。

盡管規則的配置更適合使用表結構來存儲,但為了構建分布式的數據庫集群,通常都會采用分庫和分表(分片)的方式,在這種要求下,我們將規則的配置和序列都放在Redis,以便於提供獨立的全局序列生成服務.而不用擔心數據庫伸縮帶來的影響.

序列號生成服務部署圖

序列號實現方案

1.         規則配置管理

在Redis的設計中,要想實現比如

select * from users where user.location="shanghai"

這樣的查詢,是沒辦法通過value進行比較得出結果的。但是可以通過不同的數據結構類型組合來做到這一點。比如如下的數據定義

users:1 {name:Jack,age:28,location:shanghai}

users:2 {name:Frank,age:30,location:beijing}

users:location:shanghai [1]

其中users:1 users:2 分別定義了兩個用戶信息,通過Redis中的hash數據結構,而users:location:shanghai 記錄了所有上海的用戶id,通過集合數據結構實現。

這樣通過兩次簡單的Redis命令調用就可以實現我們上面的查詢。

Jedis jedis = jedisPool.getResource();

Set<String> shanghaiIDs = jedis.smembers("users:location:shanghai"); //遍歷該set

 //... //通過hgetall獲取對應的user信息

jedis.hgetAll("users:" + shanghaiIDs[0]);

通過諸如以上的設計,可以實現簡單的條件查詢。但是這樣的問題也很多,首先需要多維護一個ID索引的集合,其次對於一些復雜查詢無能為力(當然也不能期望Redis實現像關系數據庫那樣的查詢,Redis不是干這的)。針對本序列號生成方案,這種方式完全是夠用的,可以直接參考本節的代碼示例。

如果想更進一步,Redis2.6集成了Lua(Redis是用ANSI C寫的,可以想象支持Lua是一件很自然的事),可以通過eval命令,直接在RedisServer環境中執行Lua腳本,就是說可以讓你用Lua腳本,對Redis中存儲的key value進行操作,這個意義就大了,甚至可以將系統所需的各種業務寫成一個個lua腳本,提前加載進入Redis,然后對於請求的響應,只需要調用一個個lua腳本就行。(當然這些操作也完全可以使用Jedis來完成,但顯然lua效率更高)

比如,現在我們要實現一個‘所有年齡(age)大於28歲的用戶(user)’這樣一個查詢,那么通過以下的Lua腳本就可以實現

public static final String SCRIPT = "local resultKeys={};"

 + "for k,v in ipairs(KEYS) do "

 + "  local tmp = redis.call('hget', v, 'age');"

+ "  if tmp > ARGV[1] then "

+ "   table.insert(resultKeys,v);"

 + "  end;"

 + " end;"

 + "return resultKeys;";

執行腳本代碼

Jedis jedis = jedisPool.getResource();

jedis.auth(auth);

List<String> keys = Arrays.asList(allUserKeys);

List<String> args = new ArrayList<>();

args.add("28");

List<String> resultKeys = (List<String>)jedis.evalsha(funcKey, keys, args);

return resultKeys;

注意,以上的代碼中使用的是evalsha命令,該命令參數的不是直接用Lua腳本字符串,而是提前已經加載到Redis中的函數的一個SHA索引,通過以下的代碼將系統中所有需要執行的函數提前加載到Redis中,通常在自己的系統中維護一個函數哈希表funcTable,后續需要實現什么功能,就從函數表中獲取對應功能的SHA索引,通過evalsha調用就行。

String shaFuncKey = jedis.scriptLoad(SCRIPT);//加載腳本到Redis中,獲取sha索引 funcTable.put(funcName_age, shaFuncKey);//添加到系統維護的函數表中

通過以上的方法,便可以使較為復雜的查詢放到Redis中去執行,提高效率。

可見,想要將全部業務代碼都使用lua腳本來實現的業務系統是可能的,lua腳本等同於關系型數據庫中的存儲過程或者函數。當然,全部使用lua的開發成本未必不大,畢竟不是關系型數據庫,存儲思維不同。

代碼示例:

//配置生成規則(CRUD):

//假設銷售單號生成規則:prefix+time+seq

//生成之后的結果類似於:SO20150520023014

//------模擬常規數據庫操作------

//添加數據

shardedJedis.hset("rules", "somaster",

 "name:銷售單號,prefix:SO,time:YYYYMMDD,seq:seq_so,seq_len:6");

shardedJedis.hset("rules", "pomaster",

 "name:采購單號,prefix:PO,time:YYYYMMDD,seq:seq_po,seq_len:6");

shardedJedis.hset("rules", "test", "name:test,prefix:PO,time:YYYYMMDD,seq:seq_po,seq_len:6");

//判斷某個值是否存在

System.out.println(shardedJedis.hexists("rules", "test"));

// 刪除指定的值

System.out.println(shardedJedis.hdel("rules", "test"));

// 獲取指定的值

System.out.println(shardedJedis.hget("rules", "somaster"));

// 獲取所有的keys

System.out.println(shardedJedis.hkeys("rules"));

// 獲取所有的values

System.out.println(shardedJedis.hvals("rules"));

//更新 = 插入同名的key

System.out.println("update before:"+shardedJedis.hvals("rules"));

System.out.println(shardedJedis.hset("rules", "somaster", "new test somaster"));

System.out.println("update after:"+shardedJedis.hvals("rules"));

 

我示例代碼中使用的是hash而不是直接使用key-value來存儲,是更優的方案。至此CRUD都能直接滿足了,最后,你獲取到所有values,需要自己處理分頁。

也可以使用list和set組合的方式存儲。這種方式是將list index和set key對應起來,根據序號進行分頁是容易的,但在每次新增和刪除時,都需要修改序號和key的對應關系。

兩者相比,使用hash的成本顯然更低,也不易出錯。

2.         序列號的使用

Redis中對序列的生成早已考慮周到,使用單線程操作序列的方式以保證並發安全,同時,使用也極其簡單。更多操作詳見官網API

代碼示例

//sequence

System.out.println("seq:"+shardedJedis.incr("seq"));

System.out.println("seq:"+shardedJedis.incr("seq"));

System.out.println("another_seq:"+shardedJedis.incr("another_seq"));

最后,生成序列服務只需要通過對應的規則名,獲取規則表達式,解析之后結合序列號,最終生成即可。

並發測試

這里我們使用CyclicBarrier做並發測試,CyclicBarrier會開啟指定數量的線程,等待這些線程就緒之后,同時執行測試內容,以達到真實並發的測試目的。

Loadrunner等壓力測試工具也能完成測試任務。

public class CyclicBarrierTest {
	//初始化
	JedisPoolTest test = new JedisPoolTest();
    public static void main(String[] args) {
         int count = 1000;
         CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
         ExecutorService executorService = Executors.newFixedThreadPool(count);
         for (int i = 0; i < count; i++)
              executorService.execute(new CyclicBarrierTest().new Task(cyclicBarrier));

         executorService.shutdown();
         while (!executorService.isTerminated()) {
              try {
                   Thread.sleep(10);
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
         }
    }

    public class Task implements Runnable {
         private CyclicBarrier cyclicBarrier;

         public Task(CyclicBarrier cyclicBarrier) {
              this.cyclicBarrier = cyclicBarrier;
         }

         @Override
         public void run() {
              try {
                   // 等待所有任務准備就緒
                   cyclicBarrier.await();
                   // 測試內容
                // 待測試的url
					String host = "http://172.25.2.14/seqno?";
					String para = "sysTemNo=ERP&seqName=WH-ZONE-ID&iVar=00";
					System.out.println(host + para);
					URL url = new URL(host);
					HttpURLConnection connection = (HttpURLConnection) url.openConnection();
					// connection.setRequestMethod("POST");
					// connection.setRequestProperty("Proxy-Connection", "Keep-Alive");
					connection.setDoOutput(true);
					connection.setDoInput(true);
					PrintWriter out = new PrintWriter(connection.getOutputStream());
					out.print(para);
					out.flush();
					out.close();
					BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
					String line = "";
					String result = "";
					while ((line = in.readLine()) != null) {
						result += line;
					}
					System.out.println(result);
//                   System.out.println(test.getJedis().incr("seq"));
//             	  System.out.println(test.getShardedJedis().incr("seq"));
              } catch (Exception e) {
                   e.printStackTrace();
              }
         }
    }
}

測試結果:

單台Redis未經任何設置,500並發100% pass,到1000並發時只有67%pass率,此時存在連接超時和被拒的情形。但不存在任何重復號碼或丟失號碼。500並發數其實已經完全滿足我當前系統的要求。考慮到Redis本身可以集群擴展,完全能夠應對將來更高的並發需求。  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM