請問你知道分布式系統的預寫日志設計模式么?


原文地址:https://martinfowler.com/articles/patterns-of-distributed-systems/wal.html

Write-Ahead log 預寫日志

預寫日志(WAL,Write-Ahead Log)將每次狀態更新抽象為一個命令追加寫入一個日志中,這個日志只追加寫入,也就是順序寫入,所以 IO 會很快。相比於更新存儲的數據結構並且更新落盤這個隨機 IO 操作,寫入速度更快了,並且也提供了一定的持久性,也就是數據不會丟失,可以根據這個日志恢復數據。

背景介紹

如果遇到了服務器存儲數據失敗,例如已經確認客戶端的請求,但是存儲過程中,重啟進程導致真正存儲的數據沒有落盤,在重啟后,也需要保證已經答應客戶端的請求數據更新真正落盤成功。

解決方案

image

將每一個更新,抽象為一個指令,並將這些指令存儲在一個文件中。每個進程順序追加寫各自獨立的一個文件,簡化了重啟后日志的處理,以及后續的在線更新操作。每個日志記錄有一個獨立 id,這個 id 可以用來實現分段日志(Segmented Log)或者最低水位線(Low-Water Mark)清理老的日志。日志更新可以使用單一更新隊列(Singular Update Queue)這種設計模式。

日志記錄的結構類似於:

class WALEntry {
  //日志id
  private final Long entryId;
  //日志內容
  private final byte[] data;
  //類型
  private final EntryType entryType;
  //時間
  private long timeStamp;
}

在每次重新啟動時讀取日志文件,回放所有日志條目來恢復當前數據狀態。

假設有一內存鍵值對數據庫:

class KVStore {
  private Map<String, String> kv = new HashMap<>();

  public String get(String key) {
      return kv.get(key);
  }

  public void put(String key, String value) {
      appendLog(key, value);
      kv.put(key, value);
  }

  private Long appendLog(String key, String value) {
      return wal.writeEntry(new SetValueCommand(key, value).serialize());
  }
}

put 操作被抽象為 SetValueCommand,在更新內存 hashmap 之前將其序列化並存儲在日志中。SetValueCommand 可以序列化和反序列化。

class SetValueCommand {
  final String key;
  final String value;

  public SetValueCommand(String key, String value) {
      this.key = key;
      this.value = value;
  }

  @Override
  public byte[] serialize() {
      try {
          //序列化
          var baos = new ByteArrayOutputStream();
          var dataInputStream = new DataOutputStream(baos);
          dataInputStream.writeInt(Command.SetValueType);
          dataInputStream.writeUTF(key);
          dataInputStream.writeUTF(value);
          return baos.toByteArray();

      } catch (IOException e) {
          throw new RuntimeException(e);
      }
  }

  public static SetValueCommand deserialize(InputStream is) {
      try {
          //反序列化
          DataInputStream dataInputStream = new DataInputStream(is);
          return new SetValueCommand(dataInputStream.readUTF(), dataInputStream.readUTF());
      } catch (IOException e) {
          throw new RuntimeException(e);
      }
  }
}

這可以確保即使進程重啟,這個 hashmap 也可以通過在啟動時讀取日志文件來恢復。

class KVStore {
  public KVStore(Config config) {
      this.config = config;
      this.wal = WriteAheadLog.openWAL(config);
      this.applyLog();
  }

  public void applyLog() {
      List<WALEntry> walEntries = wal.readAll();
      applyEntries(walEntries);
  }

  private void applyEntries(List<WALEntry> walEntries) {
      for (WALEntry walEntry : walEntries) {
          Command command = deserialize(walEntry);
          if (command instanceof SetValueCommand) {
              SetValueCommand setValueCommand = (SetValueCommand)command;
              kv.put(setValueCommand.key, setValueCommand.value);
          }
      }
  }

  public void initialiseFromSnapshot(SnapShot snapShot) {
      kv.putAll(snapShot.deserializeState());
  }
}

實現考慮

首先是保證 WAL 日志真的寫入了磁盤。所有編程語言提供的文件處理庫提供了一種機制,強制操作系統將文件更改flush落盤。在flush時,需要考慮的是一種權衡。對於日志的每一條記錄都flush一次,保證了強持久性,但是嚴重影響了性能並且很快會成為性能瓶頸。如果是異步flush,性能會提高,但是如果在flush前程序崩潰,則有可能造成日志丟失。大部分的實現都采用批處理,減少flush帶來的性能影響,同時也盡量少丟數據。

另外,我們還需要保證日志文件沒有損壞。為了處理這個問題,日志條目通常伴隨 CRC 記錄寫入,然后在讀取文件時進行驗證。

同時,采用單個日志文件可能變得很難管理(很難清理老日志,重啟時讀取文件過大)。為了解決這個問題,通常采用之前提到的分段日志(Segmented Log)或者最低水位線(Low-Water Mark)來減少程序啟動時讀取的文件大小以及清理老的日志。

最后,要考慮重試帶來的重復問題,也就是冪等性。由於 WAL 日志僅附加,在發生客戶端通信失敗和重試時,日志可能包含重復的條目。當讀取日志條目時,可能會需要確保重復項被忽略。但是如果存儲類似於 HashMap,其中對同一鍵的更新是冪等的,則不需要排重,但是可能會存在 ABA 更新問題。一般都需要實現某種機制來標記每個請求的唯一標識符並檢測重復請求。

舉例

各種 MQ 中的類似於 CommitLog 的日志

MQ 中的消息存儲,由於消息隊列的特性導致消息存儲和日志類似,所以一般用日志直接作為存儲。這個消息存儲一般就是 WAL 這種設計模式,以 RocketMQ 為例子:

RocketMQ:
image

RocketMQ 存儲首先將消息存儲在 Commitlog 文件之中,這個文件采用的是 mmap (文件映射內存)技術寫入與保存。關於這個技術,請參考另一篇文章JDK核心JAVA源碼解析(5) - JAVA File MMAP原理解析

當消息來時,寫入文件的核心方法是MappedFileappendMessagesInner方法:

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;
    //獲取當前寫入位置
    int currentPos = this.wrotePosition.get();
    //如果當前寫入位置小於文件大小則嘗試寫入
    if (currentPos < this.fileSize) {
        //mappedByteBuffer是公用的,在這里不能修改其position影響讀取
        //mappedByteBuffer是文件映射內存抽象出來的文件的內存ByteBuffer
        //對這個buffer的寫入,就相當於對文件的寫入
        //所以通過slice方法生成一個共享原有相同內存的新byteBuffer,設置position
        //如果writeBuffer不為空,則證明啟用了TransientStorePool,使用其中緩存的內存寫入
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        //分單條消息還有批量消息的情況
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        //增加寫入大小
        this.wrotePosition.addAndGet(result.getWroteBytes());
        //更新最新消息保存時間
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

RocketMQ 將消息存儲在 Commitlog 文件后,異步更新 ConsumeQueue 還有 Index 文件。這個 ConsumeQueue 還有 Index 文件可以理解為存儲狀態,CommitLog 在這里扮演的就是 WAL 日志的角色:只有寫入到 ConsumeQueue 的消息才會被消費者消費,只有 Index 文件中存在的記錄才能被讀取定位到。如果消息成功寫入 CommitLog 但是異步更新還沒執行,RocketMQ 進程掛掉了,這樣就存在了不一致。所以在 RocketMQ 啟動的時候,會通過如下機制保證 Commitlog 與 ConsumeQueue 還有 Index 的最終一致性.

入口是DefaultMessageStoreload方法:

public boolean load() {
    boolean result = true;
    try {
        //RocketMQ Broker啟動時會創建${ROCKET_HOME}/store/abort文件,並添加JVM shutdownhook刪除這個文件
        //通過這個文件是否存判斷是否為正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

        //加載延遲隊列消息,這里先忽略
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        //加載 Commit Log 文件
        result = result && this.commitLog.load();

        //加載 Consume Queue 文件
        result = result && this.loadConsumeQueue();

        if (result) {
            //加載存儲檢查點
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            //加載 index,如果不是正常退出,銷毀所有索引上次刷盤時間小於索引文件最大消息時間戳的文件
            this.indexService.load(lastExitOK);
            //進行 recover 恢復之前狀態
            this.recover(lastExitOK);
            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }
    if (!result) {
        this.allocateMappedFileService.shutdown();
    }
    return result;
}

進行恢復是DefaultMessageStorerecover方法:

private void recover(final boolean lastExitOK) {
    long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    //根據上次是否正常退出,采用不同的恢復方式
    if (lastExitOK) {
        this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
    } else {
        this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
    }

    this.recoverTopicQueueTable();
}

當上次正常退出時:

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        //只掃描最后三個文件
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //檢驗存儲消息是否有效
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果有效,添加這個偏移
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            //如果有效,但是大小是0,代表到了文件末尾,切換文件
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            //只有有無效的消息,就在這里停止,之后會丟棄掉這個消息之后的所有內容
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        //根據有效偏移量,刪除這個偏移量以后的所有文件,以及所有文件(正常是只有最后一個有效文件,而不是所有文件)中大於這個偏移量的部分
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        //根據 commit log 中的有效偏移量,清理 consume queue
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        //所有commit log都刪除了,那么偏移量就從0開始
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

當上次沒有正常退出時:

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 從最后一個文件開始,向前尋找第一個正常的可以恢復消息的文件
        // 從這個文件開始恢復消息,因為里面的消息有成功寫入過 consumer queue 以及 index 的,所以從這里恢復一定能保證最終一致性
        // 但是會造成某些已經寫入過 consumer queue 的消息再次寫入,也就是重復消費。
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            //尋找第一個有正常消息的文件
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        //如果小於0,就恢復所有 commit log,或者代表沒有 commit log
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        while (true) {
            //驗證消息有效性
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            //如果消息有效
            if (dispatchRequest.isSuccess()) {
                if (size > 0) {
                    mappedFileOffset += size;

                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        //如果允許消息重復轉發,則需要判斷當前消息是否消息偏移小於已確認的偏移,只有小於的進行重新分發
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            //重新分發消息,也就是更新 consume queue 和 index
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        //重新分發消息,也就是更新 consume queue 和 index
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                //大小為0代表已經讀完,切換下一個文件
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }

        //更新偏移
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        //清理
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}

總結起來就是:

  • 首先,根據 abort 文件是否存在判斷上次是否正常退出。
  • 對於正常退出的:
    • 掃描倒數三個文件,記錄有效消息的偏移
    • 掃描到某個無效消息結束,或者掃描完整個文件
    • 設置最新偏移,同時根據這個偏移量清理 commit log 和 consume queue
  • 對於沒有正常退出的:
    • 從最后一個文件開始,向前尋找第一個正常的可以恢復消息的文件
    • 從這個文件開始恢復並重發消息,因為里面的消息有成功寫入過 consumer queue 以及 index 的,所以從這里恢復一定能保證最終一致性。但是會造成某些已經寫入過 consumer queue 的消息再次寫入,也就是重復消費。
    • 更新偏移,清理

數據庫

基本上所有的數據庫都會有 WAL 類似的設計,例如 MySQL 的 Innodb redo log 等等。
image

image

一致性存儲

例如 ZK 還有 ETCD 這樣的一致性中間件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM