部分項目從kafka遷移至pulsar,近期使用中碰到了一些問題,勉強把大的坑踩完了,topic永駐,性能相關


部分項目從kafka遷移至pulsar,近期使用中碰到了一些問題,勉強把大的坑踩完了,topic永駐,性能相關

pulsar概念類的東西官方文檔和基本介紹的博客很多,也就不重復說明了,更深入的東西也不涉及

只說下近期的使用體驗

設計理念上,雖然pulsar也支持持久化隊列,但和kafka對持久化的理解是不一樣的

kafka的持久化多少有一些數據倉儲的概念在里面,數據長期保存,通常是指定數據的保存日期,kafka后台按時清理過期數據

而pulsar的持久化,是更純粹的mq的持久化,只是為了保證數據會被消費到,沒有數據倉儲的概念,只是純粹的消息隊列,這種設計目標使得pulsar對硬盤空間的回收非常積極

純粹的消息隊列個人也用過些,例如nsq,因為pulsar處處是和kafka對比,因此一直覺得pulsar的默認配置,可以完全替代kafka,但出了不少問題

回收級別

topic

kafka里的topic無論是否有消費,寫入后的數據,都會穩定的一段時間,官方默認是7天,根據應用需要會調整

pulsar里的topic,寫入完成后,即使數據從來沒有被消費過,也會在活躍檢測機制下,被自動清除

pulsar默認會有以下場景:producer 寫入了100w條數據,還沒來得及consume,topic就被清除了

這可以通過幾個配置來調整,提高活躍間隔,但治村不治本。

可以禁用非活躍topic 自動回收,相關配置為

brokerDeleteInactiveTopicsEnabled=true
brokerDeleteInactiveTopicsFrequencySeconds=60
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=

本以為能解決問題,但是歷史數據仍然消失了,這是因為topic下還有另一個維度的回收,lenger的回收,topic還在,不刪topic,但是topic下的數據,還是會被清除

lenger

kafka的topic 數據以partition區分存儲多分區數據

pulsar 沒有partition的概念,而是有lenger概念,lenger可以理解為一個文件塊,topic下的數據以lenger的格式分塊存儲,lenger有序,lenger id全局自增,和消費的offset相關,低位的lenger消費完再消費高位的lenger

例如topic 下有以下3個lenger,按順序消費每個lenger的數據

lenger_id offset_start offset_end
1 0 50000
2 50001 100000
3 100001 150000

在topic級別的活躍檢測清除外,pulsar對topic下的lenger回收也較積極

如果所有已存在的consume的offset都漫過了第50000,也就漫過了lenger_id_1,則lenger_id_1 會被pulsar積極的回收掉,這也就導致了,即使關閉了非活躍topic的自動回收,topic是還在,但是topic下的lenger仍然會'消失'的情況


kafka的一個經典使用場景是producer先寫入完成,數據默認保持數周,然后多個consumer再分別去消費

pulsar的默認行為會導致一些初從kafka遷移來未預料到的問題(實際是pulsar的機制)

  • 1 producer 寫kafka寫入topic完成,producer任務完成關閉連接,但是這時還沒有消費者來消費,下一個活躍topic檢測周期到了,該topic被完全清除,producer白寫了

  • 2 producer 寫kafka寫入topic完成,consume-1從頭消費數據,已經消費了一半的數據,pulsar檢測到該topic低位的一半lenger都被所有consume(只有consume-1)消費過了,把低位的lenger清除,過了段時間consume-2從頭消費,低位lenger的數據已經被刪除,只能處理高位lenger下的數據

解決辦法

非活躍回收可以關閉,topic會保留

brokerDeleteInactiveTopicsEnabled=false

lenger會回收已被消費過的lenger塊,如果當前lenger有consumer,並且offset處在低位,則相比該lenger高位的都不會被清除

因此初期想到的辦法是在topic時,自動新建一個consumer,將offset鎖定在低位0(這是在測試時發現的,測試topic消費,臨時起了個conumser_test,該conumser_test一直將offset鎖定在低位,上位的數據並沒有被回收)

這種方法只是使用初期工程上的辦法,非常的不優雅,只是過渡使用,更好的辦法是調整pulsar的配置,以個人多年各種服務和組件的使用優化經驗,肯定是有這個配置的


目標是解決pulsar對topic下已被consumer消費過的lenger回收問題,使lenger保留更久的時間

以下是lenger回收的相關日志,這是查看源碼的主要線索

13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015

分享下排查和調整過程(實際是走了一些彎路)

  • 調高managedLedgerOffloadDeletionLagMs

首先是在pulsar默認的配置文件里找到可能相關的參數項,關鍵詞delete/remove之類

cat /pulsar/conf/bookkeeper.conf

找到了managedLedgerOffloadDeletionLagMs,把這個時間調大,以為能生效,但是上線后還是有ledger的刪除日志,問題未解決

觀查Removing ledger 的間隔,大概是4小時一個批次

managedLedgerOffloadDeletionLagMs=14400000

managedLedgerOffloadDeletionLagMs: "864000000"調高以為能解決問題,但沒有生效,還有會有ledger的刪除日志

  • 添加 managedLedgerOffloadDeletionLagMs

https://www.google.com/search?q=managedLedgerOffloadDeletionLagMs&oq=managedLedgerOffloadDeletionLagMs

查到相關 issue https://github.com/apache/pulsar/issues/8220

google managedLedgerOffloadDeletionLagMs 查到的這個issues,但也不確定是不是問題

看了下源碼,又找到兩個可能的相關項,調整同樣不生效

按issues添加配置

ServiceConfiguration (broker.conf / standalone.conf)
managedLedgerOffloadDeletionLagMs
managedLedgerOffloadAutoTriggerSizeThresholdBytes

OffloadPolicies
managedLedgerOffloadDeletionLagInMillis
managedLedgerOffloadThresholdInBytes

  • 查看源碼尋找線索

只好從源碼層面想辦法了-其實通常這種存儲服務會給一個的config文件,里面有所有的項,按關鍵詞+經驗找相關項調整即可,通常用不着看代碼解決

目標是先找到執行lenger刪除的代碼

13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015

日志里已經有目標類的信息,源碼里全文檢索'Removing ledger'可精確定位

https://github.com/apache/pulsar/blob/v2.6.1/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

            store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
                @Override
                public void operationComplete(Void result, Stat stat) {
                    log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
                            TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
                    ledgersStat = stat;
                    metadataMutex.unlock();
                    trimmerMutex.unlock();

                    for (LedgerInfo ls : ledgersToDelete) {
                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
                        asyncDeleteLedger(ls.getLedgerId(), ls);
                    }
                    for (LedgerInfo ls : offloadedLedgersToDelete) {
                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
                                ls.getSize());
                        asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
                    }
                    promise.complete(null);
                }

                @Override
                public void operationFailed(MetaStoreException e) {
                    log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
                    metadataMutex.unlock();
                    trimmerMutex.unlock();

                    promise.completeExceptionally(e);
                }
            });
        }

核心部分是

                    for (LedgerInfo ls : ledgersToDelete) {
                        log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
                        asyncDeleteLedger(ls.getLedgerId(), ls);
                    }

其實只要注釋掉這段代碼重新編譯pulsar便能完全禁用pulsar的lenger清除,但這個太粗暴了,重新編譯自定義的jar包,雖然也很簡單,但這只是最后的辦法,最好還是先從源碼里的相關配置里解決

ledgersToDelete 相關部分

        List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
        for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
                boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();

                if (log.isDebugEnabled()) {
                    log.debug(
                            "[{}] Checking ledger {} -- time-old: {} sec -- "
                                    + "expired: {} -- over-quota: {} -- current-ledger: {}",
                            name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
                            overRetentionQuota, currentLedger.getId());
                }
                if (ls.getLedgerId() == currentLedger.getId()) {
                    log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
                            ls.getLedgerId());
                    break;
                } else if (expired) {
                    log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
                    ledgersToDelete.add(ls);
                } else if (overRetentionQuota) {
                    log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
                    ledgersToDelete.add(ls);
                } else {
                    log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
                    break;
                }
            }

其實可以簡單看到ledgersToDelete.add 有兩種來源,一是expired,二是overRetentionQuota

只要使得ledger 不滿足這兩個條件即可

                boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
                boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();

分別進入這兩個方法

    private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
        if (config.getRetentionTimeMillis() < 0) {
            // Negative retention time equates to infinite retention
            return false;
        }

        long elapsedMs = clock.millis() - ledgerTimestamp;
        return elapsedMs > config.getRetentionTimeMillis();
    }
    private boolean isLedgerRetentionOverSizeQuota() {
        // Handle the -1 size limit as "infinite" size quota
        return config.getRetentionSizeInMB() >= 0
                && TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
    }    

看到了config關鍵詞,目標就在眼前了

https://github.com/apache/pulsar/blob/v2.6.1/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

    public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
        this.retentionSizeInMB = retentionSizeInMB;
        return this;
    }
    public long getRetentionSizeInMB() {
        return retentionSizeInMB;
    }
    
    public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
        this.retentionTimeMs = unit.toMillis(retentionTime);
        return this;
    }
    public long getRetentionTimeMillis() {
        return retentionTimeMs;
    }

接下來找set的位置

https://github.com/apache/pulsar/blob/v2.6.1/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());

還需要看retentionPolicies類

實際這個時候已經可以結合配置了,查看pulsar的配置文件,發現兩項配置,按經驗,可以確定就是這兩項,更新集群配置后,重啟,lenger被清除的問題便已解決

defaultRetentionTimeInMinutes=0
defaultRetentionSizeInMB=0

總結

實際檢索代碼retentionSizeInMB的時候,檢索到了一個faq.md文件,里面明確有提到類似的問題

之前只看了官方的文檔,官方文檔更是使用和概含說明,並沒有看github上項目的一些md文件,沒有注意到這項配置,上文說的彎路就是指這個faq.md文件

這種場景常見,個人肯定不是第一個注意到並想解決辦法的

https://github.com/apache/pulsar/blob/master/faq.md#how-can-i-prevent-an-inactive-topic-to-be-deleted-under-any-circumstance-i-want-to-set-no-time-or-space-limit-for-a-certain-namespace

How can I prevent an inactive topic to be deleted under any circumstance? I want to set no time or space limit for a certain namespace.
There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use -1 for that). The only option now is to use INT_MAX for retentionTimeInMinutes and LONG_MAX for retentionSizeInMB. It’s not “infinite” but 4085 years of retention should probably be enough!

另外提一個小坑,同硬件環境裸機搭了3個節點也比單實例的慢,也排除了zk gfs 因素

3點的分布式環境比單點測試要慢

https://github.com/apache/pulsar/issues/8570
咨詢了下官方,確認了是個參數配置問題
默認單點 journalSyncData=false
默認集群 journalSyncData=true

而kakfa的默認行為類似journalSyncData=false


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM