部分項目從kafka遷移至pulsar,近期使用中碰到了一些問題,勉強把大的坑踩完了,topic永駐,性能相關
pulsar概念類的東西官方文檔和基本介紹的博客很多,也就不重復說明了,更深入的東西也不涉及
只說下近期的使用體驗
設計理念上,雖然pulsar也支持持久化隊列,但和kafka對持久化的理解是不一樣的
kafka的持久化多少有一些數據倉儲的概念在里面,數據長期保存,通常是指定數據的保存日期,kafka后台按時清理過期數據
而pulsar的持久化,是更純粹的mq的持久化,只是為了保證數據會被消費到,沒有數據倉儲的概念,只是純粹的消息隊列,這種設計目標使得pulsar對硬盤空間的回收非常積極
純粹的消息隊列個人也用過些,例如nsq,因為pulsar處處是和kafka對比,因此一直覺得pulsar的默認配置,可以完全替代kafka,但出了不少問題
回收級別
topic
kafka里的topic無論是否有消費,寫入后的數據,都會穩定的一段時間,官方默認是7天,根據應用需要會調整
pulsar里的topic,寫入完成后,即使數據從來沒有被消費過,也會在活躍檢測機制下,被自動清除
pulsar默認會有以下場景:producer 寫入了100w條數據,還沒來得及consume,topic就被清除了
這可以通過幾個配置來調整,提高活躍間隔,但治村不治本。
可以禁用非活躍topic 自動回收,相關配置為
brokerDeleteInactiveTopicsEnabled=true
brokerDeleteInactiveTopicsFrequencySeconds=60
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
本以為能解決問題,但是歷史數據仍然消失了,這是因為topic下還有另一個維度的回收,lenger的回收,topic還在,不刪topic,但是topic下的數據,還是會被清除
lenger
kafka的topic 數據以partition區分存儲多分區數據
pulsar 沒有partition的概念,而是有lenger概念,lenger可以理解為一個文件塊,topic下的數據以lenger的格式分塊存儲,lenger有序,lenger id全局自增,和消費的offset相關,低位的lenger消費完再消費高位的lenger
例如topic 下有以下3個lenger,按順序消費每個lenger的數據
lenger_id | offset_start | offset_end |
---|---|---|
1 | 0 | 50000 |
2 | 50001 | 100000 |
3 | 100001 | 150000 |
在topic級別的活躍檢測清除外,pulsar對topic下的lenger回收也較積極
如果所有已存在的consume的offset都漫過了第50000,也就漫過了lenger_id_1,則lenger_id_1 會被pulsar積極的回收掉,這也就導致了,即使關閉了非活躍topic的自動回收,topic是還在,但是topic下的lenger仍然會'消失'的情況
kafka的一個經典使用場景是producer先寫入完成,數據默認保持數周,然后多個consumer再分別去消費
pulsar的默認行為會導致一些初從kafka遷移來未預料到的問題(實際是pulsar的機制)
-
1 producer 寫kafka寫入topic完成,producer任務完成關閉連接,但是這時還沒有消費者來消費,下一個活躍topic檢測周期到了,該topic被完全清除,producer白寫了
-
2 producer 寫kafka寫入topic完成,consume-1從頭消費數據,已經消費了一半的數據,pulsar檢測到該topic低位的一半lenger都被所有consume(只有consume-1)消費過了,把低位的lenger清除,過了段時間consume-2從頭消費,低位lenger的數據已經被刪除,只能處理高位lenger下的數據
解決辦法
非活躍回收可以關閉,topic會保留
brokerDeleteInactiveTopicsEnabled=false
lenger會回收已被消費過的lenger塊,如果當前lenger有consumer,並且offset處在低位,則相比該lenger高位的都不會被清除
因此初期想到的辦法是在topic時,自動新建一個consumer,將offset鎖定在低位0(這是在測試時發現的,測試topic消費,臨時起了個conumser_test,該conumser_test一直將offset鎖定在低位,上位的數據並沒有被回收)
這種方法只是使用初期工程上的辦法,非常的不優雅,只是過渡使用,更好的辦法是調整pulsar的配置,以個人多年各種服務和組件的使用優化經驗,肯定是有這個配置的
目標是解決pulsar對topic下已被consumer消費過的lenger回收問題,使lenger保留更久的時間
以下是lenger回收的相關日志,這是查看源碼的主要線索
13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
分享下排查和調整過程(實際是走了一些彎路)
-
調高managedLedgerOffloadDeletionLagMs
首先是在pulsar默認的配置文件里找到可能相關的參數項,關鍵詞delete/remove之類
cat /pulsar/conf/bookkeeper.conf
找到了managedLedgerOffloadDeletionLagMs,把這個時間調大,以為能生效,但是上線后還是有ledger的刪除日志,問題未解決
觀查Removing ledger 的間隔,大概是4小時一個批次
managedLedgerOffloadDeletionLagMs=14400000
把 managedLedgerOffloadDeletionLagMs: "864000000"
調高以為能解決問題,但沒有生效,還有會有ledger的刪除日志
-
添加 managedLedgerOffloadDeletionLagMs
查到相關 issue https://github.com/apache/pulsar/issues/8220
google managedLedgerOffloadDeletionLagMs 查到的這個issues,但也不確定是不是問題
看了下源碼,又找到兩個可能的相關項,調整同樣不生效
按issues添加配置
ServiceConfiguration (broker.conf / standalone.conf)
managedLedgerOffloadDeletionLagMs
managedLedgerOffloadAutoTriggerSizeThresholdBytes
OffloadPolicies
managedLedgerOffloadDeletionLagInMillis
managedLedgerOffloadThresholdInBytes
-
查看源碼尋找線索
只好從源碼層面想辦法了-其實通常這種存儲服務會給一個的config文件,里面有所有的項,按關鍵詞+經驗找相關項調整即可,通常用不着看代碼解決
目標是先找到執行lenger刪除的代碼
13:41:07.520 [bookkeeper-ml-workers-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-2] Removing ledger 30295 - size: 528330607
13:43:07.520 [bookkeeper-ml-workers-OrderedExecutor-6-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-1] Removing ledger 30300 - size: 486083111
13:43:07.525 [bookkeeper-ml-workers-OrderedExecutor-5-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/a2de532c9c3142f58d43256f85146102-partition-0] Removing ledger 30253 - size: 504748015
日志里已經有目標類的信息,源碼里全文檢索'Removing ledger'可精確定位
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
ledgersStat = stat;
metadataMutex.unlock();
trimmerMutex.unlock();
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
for (LedgerInfo ls : offloadedLedgersToDelete) {
log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
ls.getSize());
asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
}
promise.complete(null);
}
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
promise.completeExceptionally(e);
}
});
}
核心部分是
for (LedgerInfo ls : ledgersToDelete) {
log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize());
asyncDeleteLedger(ls.getLedgerId(), ls);
}
其實只要注釋掉這段代碼重新編譯pulsar便能完全禁用pulsar的lenger清除,但這個太粗暴了,重新編譯自定義的jar包,雖然也很簡單,但這只是最后的辦法,最好還是先從源碼里的相關配置里解決
ledgersToDelete 相關部分
List<LedgerInfo> ledgersToDelete = Lists.newArrayList();
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
+ "expired: {} -- over-quota: {} -- current-ledger: {}",
name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
overRetentionQuota, currentLedger.getId());
}
if (ls.getLedgerId() == currentLedger.getId()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
break;
} else if (expired) {
log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
ledgersToDelete.add(ls);
} else if (overRetentionQuota) {
log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
ledgersToDelete.add(ls);
} else {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
break;
}
}
其實可以簡單看到ledgersToDelete.add 有兩種來源,一是expired,二是overRetentionQuota
只要使得ledger 不滿足這兩個條件即可
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota();
分別進入這兩個方法
private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
return false;
}
long elapsedMs = clock.millis() - ledgerTimestamp;
return elapsedMs > config.getRetentionTimeMillis();
}
private boolean isLedgerRetentionOverSizeQuota() {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() >= 0
&& TOTAL_SIZE_UPDATER.get(this) > config.getRetentionSizeInMB() * 1024 * 1024;
}
看到了config關鍵詞,目標就在眼前了
public ManagedLedgerConfig setRetentionSizeInMB(long retentionSizeInMB) {
this.retentionSizeInMB = retentionSizeInMB;
return this;
}
public long getRetentionSizeInMB() {
return retentionSizeInMB;
}
public ManagedLedgerConfig setRetentionTime(int retentionTime, TimeUnit unit) {
this.retentionTimeMs = unit.toMillis(retentionTime);
return this;
}
public long getRetentionTimeMillis() {
return retentionTimeMs;
}
接下來找set的位置
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
還需要看retentionPolicies類
實際這個時候已經可以結合配置了,查看pulsar的配置文件,發現兩項配置,按經驗,可以確定就是這兩項,更新集群配置后,重啟,lenger被清除的問題便已解決
defaultRetentionTimeInMinutes=0
defaultRetentionSizeInMB=0
總結
實際檢索代碼retentionSizeInMB的時候,檢索到了一個faq.md文件,里面明確有提到類似的問題
之前只看了官方的文檔,官方文檔更是使用和概含說明,並沒有看github上項目的一些md文件,沒有注意到這項配置,上文說的彎路就是指這個faq.md文件
這種場景常見,個人肯定不是第一個注意到並想解決辦法的
How can I prevent an inactive topic to be deleted under any circumstance? I want to set no time or space limit for a certain namespace.
There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use -1 for that). The only option now is to use INT_MAX for retentionTimeInMinutes and LONG_MAX for retentionSizeInMB. It’s not “infinite” but 4085 years of retention should probably be enough!
另外提一個小坑,同硬件環境裸機搭了3個節點也比單實例的慢,也排除了zk gfs 因素
3點的分布式環境比單點測試要慢
https://github.com/apache/pulsar/issues/8570
咨詢了下官方,確認了是個參數配置問題
默認單點 journalSyncData=false
默認集群 journalSyncData=true
而kakfa的默認行為類似journalSyncData=false