目錄

上圖是以CommitLog文件為例,展示了commitlog文件與MappedFile、MapppedFileQueue的關系。
你可以把磁盤里面commitlog文件夾下每個文件對應成MappedFile,而這個文件夾對應成MappedFileQueue。
先從MappedFileQueue看起
MappedFileQueue
private final String storePath;//存儲目錄
private final int mappedFileSize;//一個存儲文件的大小
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();//寫時復制MappedFile列表,按順序存儲各個commitlog對於的MappedFile
private final AllocateMappedFileService allocateMappedFileService;//分配MappedFile服務
private long flushedWhere = 0;//當前刷盤指針的位置,表示之前的數據已經刷到磁盤
private long committedWhere = 0;//當前數據提交指針
MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound)
/*
此處得到的第一個MappedFile的文件起始offset(即文件名)不一定是0,
之前的文件有可能已經被清除了。
*/
MappedFile mappedFile = this.getFirstMappedFile();
if (mappedFile != null) {
/*
之前的文件有可能已經被清除了(從this.mappedFiles里也會刪掉)。因此不能直接用offset / this.mappedFileSize
計算offset對應的文件索引。
*/
int index = (int) ((offset / this.mappedFileSize) - (mappedFile.getFileFromOffset() / this.mappedFileSize));
if (index < 0 || index >= this.mappedFiles.size()) {
LOG_ERROR.warn("Offset for {} not matched. Request offset: {}, index: {}, " +
"mappedFileSize: {}, mappedFiles count: {}",
mappedFile,
offset,
index,
this.mappedFileSize,
this.mappedFiles.size());
}
try {
return this.mappedFiles.get(index);
} catch (Exception e) {
if (returnFirstOnNotFound) {
return mappedFile;
}
LOG_ERROR.warn("findMappedFileByOffset failure. ", e);
}
}

MappedFile
public static final int OS_PAGE_SIZE = 1024 * 4;//系統頁緩存大小
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);//當前JVM實例中MappedFile虛擬內存
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//當前JVM實例中MappedFile對象個數
protected final AtomicInteger wrotePosition = new AtomicInteger(0);//當前文件的寫指針
//ADD BY ChenYang
protected final AtomicInteger committedPosition = new AtomicInteger(0);//當前文件的提交指針
private final AtomicInteger flushedPosition = new AtomicInteger(0);//刷寫磁盤的指針
protected int fileSize;//文件大小
protected FileChannel fileChannel;//文件通道
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;//該buffer從transientStorePool申請,消息首先會放到這里,然后再提交到FileChannel
protected TransientStorePool transientStorePool = null;//堆外內存池,與上面的writeBuffer共同起作用
private String fileName;//文件名稱
private long fileFromOffset;//文件起始物理偏移地址
private File file;//文件本身
private MappedByteBuffer mappedByteBuffer;//FileChannel對應的內存映射
private volatile long storeTimestamp = 0;//文件最后一次內容寫入時間
private boolean firstCreateInQueue = false;//是否是MappedFileQueue中的第一個文件
init(final String fileName, final int fileSize)
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());//文件名即為文件的起始物理偏移量
boolean ok = false;
ensureDirOK(this.file.getParent());//確保文件的父目錄存在
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();//創建文件讀寫通道
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//獲得映射buffer
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);//增加
TOTAL_MAPPED_FILES.incrementAndGet();//增加
ok = true;
}

int commit(final int commitLeastPages)
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
//沒開啟堆外內存池,不需要提交數據到fileChannel
return this.wrotePosition.get();//返回當前寫的位置
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {//???
commit0(commitLeastPages);
this.release();//???
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
//所有數據已經被提交到了FileChannel
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);//將writeBuffer歸還給transientStorePool
this.writeBuffer = null;
}
return this.committedPosition.get();//返回當前提交的位置
boolean isAbleToCommit(final int commitLeastPages)
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;//當前文件已經寫滿;,可以提交
}
if (commitLeastPages > 0) {
//本次要提交的數據頁大於等於允許提交的最小閾值,可以提交
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;//當前寫的位置大於提交的位置,可以提交
void commit0(final int commitLeastPages)
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();//創建一個buffer,與writeBuffer指向同一緩存區
byteBuffer.position(lastCommittedPosition);//回退buffer當前指針位置為lastCommittedPosition
byteBuffer.limit(writePos);//設置當前最大有效數據指針
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
