RocketMQ-創建MappedFile本地文件


了解RocketMQ的都知道,它會保存所有的消息到本地文件。這個文件就是 MappedFile,每一個文件對應一個MappedFile.默認情況下大小位1g。

在MessageStoreConfig中的mapedFileSizeCommitLog設置,當然一半情況下是通過配置文件來設置的。文件路勁也都是在這個配置類里。

文件名格式是20位的數字,在這個類里生成,類似這樣的(00000001000000000000 00100000000000000000 09000000000020000000):

MappedFile由MappedFileQueue管理,下面是用來生成文件的方法:

   /**
     * 獲取最后一個 MappedFile,若不存在或文件已滿,則進行創建。
     * @param startOffset
     * @param needCreate
     * @return
     */
    public MapedFile getLastMapedFile(final long startOffset, boolean needCreate) {
        // 這個offeset是所有MappedFile的全局offset
        long createOffset = -1; // 創建文件開始offset。-1時,不創建
        MapedFile mapedFileLast = null;
        {
            this.readWriteLock.readLock().lock();
            if (this.mapedFiles.isEmpty()) {    // 一個映射文件都不存在
                createOffset = startOffset - (startOffset % this.mapedFileSize);
            } else {
                // 如果存在MappedFile對象,則獲取最后一個List<MapedFile> mapedFiles = new ArrayList<MapedFile>()
                mapedFileLast = this.mapedFiles.get(this.mapedFiles.size() - 1);
            }
            this.readWriteLock.readLock().unlock();
        }

        if (mapedFileLast != null && mapedFileLast.isFull()) {  // 最后一個文件已滿
            // 通過這里你可以知道這個offset是所有文件的offse,而不是一個mappedfile的offset
            createOffset = mapedFileLast.getFileFromOffset() + this.mapedFileSize;
        }

        // 創建文件
        if (createOffset != -1 && needCreate) {
            // 計算文件名。從此處我們可 以得知,MappedFile的文件命名規則:
            // 00000001000000000000 00100000000000000000 09000000000020000000二十位
            // fileName[n] = fileName[n - 1] + n * mappedFileSize fileName[0] = startOffset - (startOffset % this.mappedFileSize)
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            log.info("shang's log >>> create mappedFile nextFilePath:{}",nextFilePath);
            String nextNextFilePath =
                    this.storePath + File.separator
                            + UtilAll.offset2FileName(createOffset + this.mapedFileSize);
            //  下一個文件的地址都算出來了,也對,就新建的時候可以直接加一個mapedfilesize來計算,就是1g。1024*1024*1024
            log.info("shang's log >>> nextNextFilePath:{}",nextNextFilePath);

            MapedFile mapedFile = null;

            // 兩種方式創建文件
            if (this.allocateMapedFileService != null) {
                mapedFile =
                        this.allocateMapedFileService.putRequestAndReturnMapedFile(nextFilePath,
                                nextNextFilePath, this.mapedFileSize);
            } else {
                try {
                    mapedFile = new MapedFile(nextFilePath, this.mapedFileSize);
                } catch (IOException e) {
                    log.error("create mapedfile exception", e);
                }
            }

            if (mapedFile != null) {
                this.readWriteLock.writeLock().lock();
                if (this.mapedFiles.isEmpty()) {
                    mapedFile.setFirstCreateInQueue(true);
                }
                this.mapedFiles.add(mapedFile);
                this.readWriteLock.writeLock().unlock();
            }

            return mapedFile;
        }

        return mapedFileLast;
    }

有兩種方式生成mappedfile文件,第一種可以參考這篇文章:http://www.cnblogs.com/guazi/p/6850988.html

最終都是通過標黃代碼那樣來生成文件的。我們來看一下,就是MappedFile的構造方法:

    // 創建mappedfile文件
    public MapedFile(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {
       // 這兩個都是與文件對應的mappedfile對象屬性變量 
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
       // 這兩個是rocketmq系統的共享靜態變量,用來標示虛擬內存使用情況和文件數量 TotalMapedVitualMemory.addAndGet(fileSize); TotalMapedFiles.incrementAndGet(); ok
= true; } catch (FileNotFoundException e) { log.error("create file channel " + this.fileName + " Failed. ", e); throw e; } catch (IOException e) { log.error("map file " + this.fileName + " Failed. ", e); throw e; } finally { if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM