producer 在發送消息的時候,會生成一個 "唯一" 的 msgId,broker 會為這個 msgId 創建哈希索引
UNIQ_KEY 由客戶端生成
org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID
msgId 由 前綴 + 內容 組成:
前綴
ip 地址,進程號,classLoader 的 hashcode
內容
時間差(當前時間減去當月一日),計數器
static { byte[] ip; try { // 獲取 ip 地址,字節數組 ip = UtilAll.getIP(); } catch (Exception e) { ip = createFakeIP(); } LEN = ip.length + 4 + 4 + 4 + 2; ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4); tempBuffer.position(0); // 寫入 ip tempBuffer.put(ip); tempBuffer.position(ip.length); // 寫入進程號 tempBuffer.putInt(UtilAll.getPid()); tempBuffer.position(ip.length + 4); // 寫入 hashcode tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); // 二進制轉十六進制字符串 FIX_STRING = UtilAll.bytes2string(tempBuffer.array()); // 設置起始時間為當月1日 setStartTime(System.currentTimeMillis()); COUNTER = new AtomicInteger(0); }
byte 數組轉十六進制字符串
// org.apache.rocketmq.common.UtilAll#bytes2string
// final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { // & 過之后,byte 轉成 int int v = src[j] & 0xFF; // 無符號右移 4 位,高位補 0 ,即取字節的高 4 位 hexChars[j * 2] = HEX_ARRAY[v >>> 4]; // 取字節低 4 位 hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; } return new String(hexChars); }
設置開始時間
// org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime private synchronized static void setStartTime(long millis) { Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(millis); cal.set(Calendar.DAY_OF_MONTH, 1); cal.set(Calendar.HOUR_OF_DAY, 0); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); // 當月1日為開始時間 startTime = cal.getTimeInMillis(); // 下月1日 cal.add(Calendar.MONTH, 1); nextStartTime = cal.getTimeInMillis(); }
// org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer private static byte[] createUniqIDBuffer() { ByteBuffer buffer = ByteBuffer.allocate(4 + 2); long current = System.currentTimeMillis(); if (current >= nextStartTime) { setStartTime(current); } buffer.position(0); // 當前時間減去當月一日 buffer.putInt((int) (System.currentTimeMillis() - startTime)); // 計數器 buffer.putShort((short) COUNTER.getAndIncrement()); return buffer.array(); }
public static String createUniqID() { // 1 個字節,8 位,每 4 位一個十六進制字符 StringBuilder sb = new StringBuilder(LEN * 2); // 前綴:ip,pid,hashcode sb.append(FIX_STRING); // 時間差 + 計數器 sb.append(UtilAll.bytes2string(createUniqIDBuffer())); return sb.toString(); }
ByteBufer 寫入數據的時候,默認是大端字節序,比如寫入一個 int,先寫高位字節,再寫低位字節。