這篇分析一下namenode 寫edit log的過程。
關於namenode日志,集群做了如下配置
<property>
<name>dfs.nameservices</name>
<value>sync</value>
<description>Logical name for this new nameservice</description>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file://home/wudi/hadoop/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://host1:port1;host2:port2;host3:port3/sync</value>
</property>
這個配置是說namenode寫edit log需要往兩個地方寫,第一個是/home/wudi/hadoop/nn,namenode本地文件系統,另外一個qjournal,這是一個共享的edit log directory,namenode往多個JournalNode寫edit log,namenode作為Paxos中的Proposer,JournalNode作為Acceptor,保證多點寫時也能對edit log達成一致。實際上,我的集群上起了3個JournalNode進程。
總體來說,namenode多線程寫edit log,edit log維護雙buffer,一個用於填充數據,另外一個用於flush。往buffer中寫edit log需要事先加鎖,寫完后檢查如果buffer中數據大小達到閾值,則進行sync,將buffer真正寫出. 或者,線程主動調用sync,主動將buffer寫出去.sync時,也要加鎖,和往buffer中寫edit log是同一把鎖,拿住鎖后,切buffer,然后解鎖,在鎖外面將buffer寫出去。在我的配置中,需要寫兩個地方,一個是namenode本地的存edit log的目錄file://home/wudi/hadoop/nn,另外一個是qjournal,往三個JournalNode進程並行寫.
下面看看代碼:
FSEditLog的初始化
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
isSyncRunning = false;
this.conf = conf;
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now();
// If this list is empty, an error will be thrown on first use
// of the editlog, as no journals will exist
this.editsDirs = Lists.newArrayList(editsDirs);
this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
}
this.editsDirs就是配置項dfs.namenode.name.dir和dfs.namenode.shared.edits.dir的
和.
this.sharedEditsDirs是配置項dfs.namenode.shared.edits.dir
private synchronized void initJournals(List<URI> dirs) {
int minimumRedundantJournals = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
journalSet = new JournalSet(minimumRedundantJournals);
for (URI u : dirs) {
boolean required =FSNamesystem.getRequiredNamespaceEditsDirs(conf)
.contains(u);
if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
StorageDirectory sd = storage.getStorageDirectory(u);
if (sd != null) {
journalSet.add(new FileJournalManager(conf, sd, storage),
required, sharedEditsDirs.contains(u));
}
} else {
journalSet.add(createJournal(u), required,
sharedEditsDirs.contains(u));
}
}
if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!");
}
}
傳進來的是this.editsDirs,一個是本地edit log目錄,另外一個是qjournal,JournalSet用來管理多個edit log directory,包括本地的和共享的,那么在我的集群配置下,journalSet里面有兩個JournalAndStream對象。JournalAndStream對象包裝了具體的edit log輸出流和具體的管理流的manager。對於qjournal來說,manager是QuorumJournalManager,對於本地目錄來說,manager是FileJournalManager.不同的manager 使用不同的edit log輸出流,每一種具體的輸出流都繼承自EditLogOutputStream這個基類.每次切換edit log segment時,會調用manager的startLogSegment方法來生成一個新的輸出流。對於QuorumJournalManager來說,輸出流是QuorumOutputStream,對於FileJournalManager來說,輸出流是EditLogFileOutputStream.用戶可以實現自己的manager,通過配置參數dfs.namenode.edits.journal-plugin.qjournal。上層FSEditLog調用startLogSegment切換一個edit log segment時,調用的是JournalSet的startLogSegment,它會調用它所包含的manager的startLogSegment,這樣就產生出了兩個輸出流。
下面看看寫edit log
一般來說,namenode寫edit log的函數調用順序是先調void logEdit(final FSEditLogOp op)然后調用public void logSync(),這種方式主要是為了做batch,提高吞吐.logEdit往buffer里寫,logSync在真正flush.
先看FSEditLog的logEdit:
void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
waitIfAutoSyncScheduled();
long start = beginTransaction();
op.setTransactionId(txid);
try {
editLogStream.write(op);
} catch (IOException ex) {
// All journals failed, it is handled in logSync.
}
endTransaction(start);
// check if it is time to schedule an automatic sync
if (!shouldForceSync()) {
return;
}
isAutoSyncScheduled = true;
}
// sync buffered edit log entries to persistent store
logSync();
}
首先,會檢查是否sync操作已經被別人調度了(檢查isAutoSyncScheduled變量),如果是,說明別的線程即將進行sync操作,則該線程wait,別的線程將buffer切換好后,調用doneWithAutoSyncScheduling將isAutoSyncScheduled置為false,然后將其他等待的線程喚醒. 接着,為edit log分配一個transaction id,id從全局分配器txid分配,以1遞增,獲得的transaction id保存在線程私有變量中,然后將op寫入QuorumOutputStream和EditLogFileOutputStream的buffer中.接着調用shouldForceSync()這個方法會檢查每個流的shouldForceSync(),只要有一個返回true,就返回true,意思是buffer夠大了,攢的差不多了,該sync一次了,接着就調度一次sync將isAutoSyncScheduled置為true.然后調logSync().QuorumOutputStream這個流永遠返回false,EditLogFileOutputStream發現buffer中數據超過512KB(不可配置),則返回true.如果buffer不滿512KB,logEdit()會直接返回,不進行logSync,可以看到這里對log進行了batch。
下面看logSync()
public void logSync() {
long syncStart = 0;
// Fetch the transactionId of this thread.
long mytxid = myTransactionId.get().txid;
boolean sync = false;
try {
EditLogOutputStream logStream = null;
synchronized (this) {
try {
printStatistics(false);
// if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
//
if (mytxid <= synctxid) {
numTransactionsBatchedInSync++;
if (metrics != null) {
// Metrics is non-null only when used inside name node
metrics.incrTransactionsBatchedInSync();
}
return;
}
// now, this thread will do the sync
syncStart = txid;
isSyncRunning = true;
sync = true;
// swap buffers
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg);
}
} finally {
// Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}
// do the sync
long start = now();
try {
if (logStream != null) {
logStream.flush();
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
IOUtils.cleanup(LOG, journalSet);
terminate(1, msg);
}
}
long elapsed = now() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
}
} finally {
// Prevent RuntimeException from blocking other log edit sync
synchronized (this) {
if (sync) {
synctxid = syncStart;
isSyncRunning = false;
}
this.notifyAll();
}
}
}
首先,檢查是不是別的線程正在做sync(isSyncRunning),如果別的線程正在做並且當前edit log的mytxid大於到目前位置已經sync的最大的synctxid,那么等待。別的線程sync完成后會更新synctxid,並且isSyncRunning置為false,然后喚醒這個線程。線程醒來后,檢查是否自己的mytxid對應的edit log已經被sync了,如果是,返回。否則,開始做sync,將isSyncRunning置為true告訴別的線程。然后調用setReadyToFlush切換buffer,調用doneWithAutoSyncScheduling允許別的線程往buffer中寫數據。然后進行實際的flush。最后更新synctxid並置isSyncRunning置為false,然后喚醒其他線程.
結束.
參考資料
hadoop-hdfs-2.4.1.jar