rocketMQ 的 msgId


producer 在發送消息的時候,會生成一個 "唯一" 的 msgId,broker 會為這個 msgId 創建哈希索引

UNIQ_KEY 由客戶端生成

org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID

msgId 由 前綴 + 內容 組成:
前綴
ip 地址,進程號,classLoader 的 hashcode
內容
時間差(當前時間減去當月一日),計數器

static {
    byte[] ip;
    try {
        // 獲取 ip 地址,字節數組
        ip = UtilAll.getIP();
    } catch (Exception e) {
        ip = createFakeIP();
    }
    LEN = ip.length + 4 + 4 + 4 + 2;
    ByteBuffer tempBuffer = ByteBuffer.allocate(ip.length + 4 + 4);
    tempBuffer.position(0);
    // 寫入 ip
    tempBuffer.put(ip);
    tempBuffer.position(ip.length);
    // 寫入進程號
    tempBuffer.putInt(UtilAll.getPid());
    tempBuffer.position(ip.length + 4);
    // 寫入 hashcode
    tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
    // 二進制轉十六進制字符串
    FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
    // 設置起始時間為當月1日
    setStartTime(System.currentTimeMillis());
    COUNTER = new AtomicInteger(0);
}

byte 數組轉十六進制字符串

// org.apache.rocketmq.common.UtilAll#bytes2string
// final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
public static String bytes2string(byte[] src) { char[] hexChars = new char[src.length * 2]; for (int j = 0; j < src.length; j++) { // & 過之后,byte 轉成 int int v = src[j] & 0xFF; // 無符號右移 4 位,高位補 0 ,即取字節的高 4 位 hexChars[j * 2] = HEX_ARRAY[v >>> 4]; // 取字節低 4 位 hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F]; } return new String(hexChars); }

設置開始時間

// org.apache.rocketmq.common.message.MessageClientIDSetter#setStartTime
private synchronized static void setStartTime(long millis) {
    Calendar cal = Calendar.getInstance();
    cal.setTimeInMillis(millis);
    cal.set(Calendar.DAY_OF_MONTH, 1);
    cal.set(Calendar.HOUR_OF_DAY, 0);
    cal.set(Calendar.MINUTE, 0);
    cal.set(Calendar.SECOND, 0);
    cal.set(Calendar.MILLISECOND, 0);
    // 當月1日為開始時間
    startTime = cal.getTimeInMillis();
    // 下月1日
    cal.add(Calendar.MONTH, 1);
    nextStartTime = cal.getTimeInMillis();
}
// org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqIDBuffer
private static byte[] createUniqIDBuffer() {
    ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
    long current = System.currentTimeMillis();
    if (current >= nextStartTime) {
        setStartTime(current);
    }
    buffer.position(0);
    // 當前時間減去當月一日
    buffer.putInt((int) (System.currentTimeMillis() - startTime));
    // 計數器
    buffer.putShort((short) COUNTER.getAndIncrement());
    return buffer.array();
}
public static String createUniqID() {
    // 1 個字節,8 位,每 4 位一個十六進制字符
    StringBuilder sb = new StringBuilder(LEN * 2);
    // 前綴:ip,pid,hashcode
    sb.append(FIX_STRING);
    // 時間差 + 計數器
    sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
    return sb.toString();
}

 

ByteBufer 寫入數據的時候,默認是大端字節序,比如寫入一個 int,先寫高位字節,再寫低位字節。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM