我們可以把loghub當作一個消息中間件來使用。如果能知道當前的消費進度,自然好了,否則消費情況一無所知,總是有點慌!
loghub消費分兩種情況,一是普通消費,二是消費組消費;
消費組消費,loghub服務端會記錄消費情況,這時可以通過調用服務端API進行偏移信息查詢。
普通消費則不同,需要自行維護偏移量,即只有自己知道偏移信息,自己處理延遲。我們主要討論這種情況。
一、 消費loghub數據的樣例如下:
// 普通消費 private static void consumeDataFromShard(int shardId) throws Exception { String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor(); System.out.println("cursor = " +cursor); try { while (true) { PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor); PullLogsResponse response = client.pullLogs(request); List<LogGroupData> logGroups = response.getLogGroups(); if (logGroups.isEmpty()) { return; } System.out.println(response.getCount()); System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor()); logGroups.forEach(rec1 -> { // do your biz }); cursor = response.getNextCursor(); Thread.sleep(200); } } catch(LogException e) { System.out.println(e.GetRequestId() + e.GetErrorMessage()); } }
因為消費一直在進行,想要進行監控,就插入一些埋點。我們可以使用的 Map 來保存每個 shard 的消費延遲情況。用一個 LoghubCursorDelayTransformer 描述具體信息。
/** * 消費偏移控制容器 */ public static final ConcurrentMap<Integer, LoghubCursorDelayTransformer> CONSUME_CURSOR_DELAY_TRANSFORMER = new ConcurrentHashMap<>(); /** * loghub 分區延遲管理器 * * @author weiy * @date 2019/11/27 */ public class LoghubCursorDelayTransformer { /** * 最后一次消費 loghub 數據的時間(大約) */ private int lastConsumeDataTime; /** * 消費延遲 (s) */ private int delay; /** * 分區 shard */ private int shard; /** * 記錄創建時間,如果創建時間已很久,說明該消費延遲應已失效 */ private long recordTime = System.currentTimeMillis(); public LoghubCursorDelayTransformer(int lastConsumeDataTime, int delay, int shard) { this.lastConsumeDataTime = lastConsumeDataTime; this.delay = delay; this.shard = shard; } public int getLastConsumeDataTime() { return lastConsumeDataTime; } public int getDelay() { return delay; } public int getShard() { return shard; } public long getRecordTime() { return recordTime; } }
二、 埋點插入監控數據
只要在每次消費完成之后,進行一次消費延遲的記錄就好了,具體記錄可以視情況而定。比如,每消費一批次之后記錄一次就是個不錯的選擇!
private static void consumeDataFromShard(int shardId) throws Exception { String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor(); System.out.println("cursor = " +cursor); try { while (true) { PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor); PullLogsResponse response = client.pullLogs(request); List<LogGroupData> logGroups = response.getLogGroups(); if (logGroups.isEmpty()) { // 沒有更多數據,以當前系統時間作為最后消費時間(並不關心實際生產者是否有在產生舊數據) metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1); return; } System.out.println(response.getCount()); System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor()); logGroups.forEach(rec1 -> { // do your biz }); // 每批次消費完成后,記錄一次消費延遲情況 // 此處取 最后一個消息的時間作為批次時間點 int lastestConsumeTime = logGroups.get(logGroups.size() -1).GetFastLogGroup().getLogs(0).getTime(); metricConsumeDelay(lastestConsumeTime, shardId, null); cursor = response.getNextCursor(); Thread.sleep(200); } } catch(LogException e) { System.out.println(e.GetRequestId() + e.GetErrorMessage()); } } /** * 記錄消費延遲信息 * * @param lastConsumeTime 最后消費時間(如果沒有獲取到數據,則使用系統時間代替),單位為 s秒 * @param shard 分區id * @param calculatedDelay 已計算好的延時,為null時需要根據當前系統時間計算 */ public static void metricConsumeDelay(int lastConsumeTime, int shard, Integer calculatedDelay) { if(calculatedDelay == null) { calculatedDelay = (int)(System.currentTimeMillis() / 1000) - lastConsumeTime; } LoghubCursorDelayTransformer delayTransformer = new LoghubCursorDelayTransformer( lastConsumeTime, calculatedDelay, shard); CONSUME_CURSOR_DELAY_TRANSFORMER.put(shard, delayTransformer); }
如上的延遲統計是不准確的,如果想准確統計,應使用 cursor 與 最后的偏移進行對比才行。如下:
private static void consumeDataFromShard(int shardId) throws Exception { String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor(); System.out.println("cursor = " +cursor); try { while (true) { PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor); PullLogsResponse response = client.pullLogs(request); List<LogGroupData> logGroups = response.getLogGroups(); if (logGroups.isEmpty()) { // 沒有更多數據,以當前系統時間作為最后消費時間(並不關心實際生產者是否有在產生舊數據) metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1); return; } System.out.println(response.getCount()); System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor()); logGroups.forEach(rec1 -> { // do your biz }); cursor = response.getNextCursor(); // 從loghub-api 換取具體時間,計算延遲,可能會導致性能下降厲害 int lastestConsumeTime = exchangeTimeWithCursorFromApi(cursor, shardId); int delay = getMaxTimeOffsetFromApi(shardId) - lastestConsumeTime; metricConsumeDelay(lastestConsumeTime, shardId, delay); Thread.sleep(200); } } catch(LogException e) { System.out.println(e.GetRequestId() + e.GetErrorMessage()); } } /** * 從loghub-api中獲取對應cursor的時間 * * @param cursor 指定游標(當前) * @param shardId 分區id * @return 數據時間 * @throws LogException 查詢異常時拋出 */ public static int exchangeTimeWithCursorFromApi(String cursor, int shardId) throws LogException { GetCursorTimeResponse cursorTimeResponse = client.GetCursorTime(project, logStore, shardId, cursor); return cursorTimeResponse.GetCursorTime(); } /** * 從loghub-api中獲取最大的時間偏移,以便精確計算消費延遲 * * @param shardId 分區id * @return 最大時間 * @throws LogException 查詢異常時拋出 */ public static int getMaxTimeOffsetFromApi(int shardId) throws LogException { String cursor = client.GetCursor(project, logStore, shardId, Consts.CursorMode.END).GetCursor(); return exchangeTimeWithCursorFromApi(cursor, shardId); }
三、 監控數據暴露
通過prometheus進行數據暴露!
/** * 暴露延遲信息數據,啟動時調用即可 */ public static void exposeMetricData() { // 統計loghub消費延時 CollectorRegistry.defaultRegistry.register(new Collector() { @Override public List<MetricFamilySamples> collect() { List<MetricFamilySamples> mfs = new ArrayList<>(); final ConcurrentMap<Integer, LoghubCursorDelayTransformer> cursorHolder = CONSUME_CURSOR_DELAY_TRANSFORMER; // With lastest time labels GaugeMetricFamily consumeTimeGauge = new GaugeMetricFamily("my_shard_consume_lastest", "last consume time watch help", Collections.singletonList("shard")); // With delay labels GaugeMetricFamily delayGauge = new GaugeMetricFamily("my_shard_consume_delay", "delay msg help", Collections.singletonList("shard")); // todo: 注意優化消費長時間暫停情況 for (LoghubCursorDelayTransformer delayTransformer : cursorHolder.values()) { delayGauge.addMetric( Collections.singletonList(delayTransformer.getShard() + ""), delayTransformer.getDelay()); consumeTimeGauge.addMetric(Collections.singletonList("" + delayTransformer.getShard()), delayTransformer.getLastConsumeDataTime()); } mfs.add(delayGauge); mfs.add(consumeTimeGauge); return mfs; } }); }
是不是很簡單?自定義一個 Collector 就可以了。接入信息的其他細節可以參考之前的文章。
四、 消費組的監控?
消費端實踐
private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; private static String sProject = "ali-cn-hangzhou-sls-admin"; private static String sLogstore = "sls_operation_log"; private static String sConsumerGroup = "consumerGroupX"; private static String sAccessKeyId = ""; private static String sAccessKey = ""; public static void groupConsume() throws LogHubClientWorkerException, InterruptedException { // 第二個參數是消費者名稱,同一個消費組下面的消費者名稱必須不同,可以使用相同的消費組名稱,不同的消費者名稱在多台機器上啟動多個進程,來均衡消費一個Logstore,這個時候消費者名稱可以使用機器ip來區分。第9個參數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用默認值即可,如有調整請注意取值范圍(0,1000]。 LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR); ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config); Thread thread = new Thread(worker); //Thread運行之后,Client Worker會自動運行,ClientWorker擴展了Runnable接口。 thread.start(); Thread.sleep(60 * 60 * 1000); //調用worker的Shutdown函數,退出消費實例,關聯的線程也會自動停止。 worker.shutdown(); //ClientWorker運行過程中會生成多個異步的Task,Shutdown之后最好等待還在執行的Task安全退出,建議sleep 30s。 Thread.sleep(30 * 1000); } // 消費業務端樣例 public class SampleLogHubProcessor implements ILogHubProcessor { private int shardId; // 記錄上次持久化 checkpoint 的時間。 private long mLastCheckTime = 0; public void initialize(int shardId) { this.shardId = shardId; } // 消費數據的主邏輯,這里面的所有異常都需要捕獲,不能拋出去。 public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) { // 這里簡單的將獲取到的數據打印出來。 for (LogGroupData logGroup : logGroups) { FastLogGroup flg = logGroup.GetFastLogGroup(); System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s", flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID())); System.out.println("Tags"); for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) { FastLogTag logtag = flg.getLogTags(tagIdx); System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue())); } for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) { FastLog log = flg.getLogs(lIdx); System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount()); for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) { FastLogContent content = log.getContents(cIdx); System.out.println(content.getKey() + "\t:\t" + content.getValue()); } } } long curTime = System.currentTimeMillis(); // 每隔 30 秒,寫一次 checkpoint 到服務端,如果 30 秒內,worker crash, // 新啟動的 worker 會從上一個 checkpoint 取消費數據,有可能有少量的重復數據。 if (curTime - mLastCheckTime > 30 * 1000) { try { //參數true表示立即將checkpoint更新到服務端,為false會將checkpoint緩存在本地,后台默認隔60s會將checkpoint刷新到服務端。 checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } mLastCheckTime = curTime; } return null; } // 當 worker 退出的時候,會調用該函數,用戶可以在此處做些清理工作。 public void shutdown(ILogHubCheckPointTracker checkPointTracker) { //將消費斷點保存到服務端。 try { checkPointTracker.saveCheckPoint(true); } catch (LogHubCheckPointException e) { e.printStackTrace(); } } } class SampleLogHubProcessorFactory implements ILogHubProcessorFactory { public ILogHubProcessor generatorProcessor() { // 生成一個消費實例。 return new SampleLogHubProcessor(); } }
實現原理即定期向loghub中寫入 checkpoint, 以便可以查詢。既然數據都寫入了 loghub 服務端,那么也能很容易在后台看到消費延遲了。
不過我們也可以通過api獲取消費情況,自行另外監控也行。(只是意義不大)
可以通過如下方式獲取當前消費情況,與最后的數據偏移做比較,就可以得到延遲情況了。
List<ConsumerGroupShardCheckPoint> checkPoints = client.GetCheckPoint(project, sLogstore, sConsumerGroup).getCheckPoints();
五、 grafana 延遲監控配置
前面通過prometheus獲取到了延遲數據,接入到grafana后,就可以進行展示了。我們先來看下最終效果!
配置本身是很簡單的,有個注意的點是需要整合兩個坐標數據,因為一個消費延遲數據,另一個是具體的消費時間,這樣就可以同步查看了。
配置右邊的Y軸坐標需要使用 series override 選項,使用正則進行匹配如: /最后消費時間shard:.*/i
時間選項需要乘以1000變為毫秒如: test_shard_consume_lastest * 1000
監控思路可以擴展到以拉取模式進行消費的消息系統。