消費阿里雲日志服務SLS


此文檔只關心消費接入,不關心日志接入,只關心消費如何接入,可直接跳轉到【sdk消費接入]

SLS簡介

  • 日志服務:
    • 日志服務(Log Service,簡稱 LOG)是針對日志類數據的一站式服務,在阿里巴巴集團經歷大量大數據場景錘煉而成。您無需開發就能快捷完成日志數據采集、消費、投遞以及查詢分析等功能,提升運維、運營效率,建立 DT 時代海量日志處理能力
  • 功能
    • 實時采集與消費(LogHub)
    • 投遞數倉(LogShipper)
    • 查詢與實時分析(Search/Analytics)

接入消費流程

賬號問題
  • 如果消費自己的日志,直接使用自己阿里雲賬號的key
  • 如果消費他人提供的日志,需要別人創建的子賬號或者賬號(推薦子賬號,無安全問題)中的key,使用自己賬號無法連接通
接入點EndPoint

消費接入(java)

概念
對象 明細
Log 日志、日志組表示等基本概念
Project 項目
Config 配置
LogStore 日志庫
Index 索引
Shard 分區
ConsumerGroup 消費組
配置

就如同使用 API 和日志服務服務端交互一樣,使用 SDK 也需要指定一些基本配置。目前,所有語言的 SDK 都定義了一個 Client 類作為入口類,這些基本配置信息在該入口類的構造時指定。

具體包括如下幾項:

  • 服務入口(Endpoint):確認 Client 需要訪問的服務入口
    • 當使用 SDK 時,首先需要明確訪問的日志服務 Project 所在 Region(如“華東 1 (杭州)”、“華北 1 (青島)”等),然后選擇與其匹配的日志服務入口初始化 Client。該服務入口與 API 中的 服務入口 定義一致
    • 當選擇 Client 的 Endpoint 時,必須要保證您需要訪問的 Project 的 Region 和 Endpoint 對應的 Region 一致,否則 SDK 將無法訪問您指定的 Project
    • 由於 Client 實例只能在構造時指定該服務入口,如果需要訪問不同 Region 里的 Project,則需要用不同的 Endpoint 構建不同的 Client 實例
    • 目前,所有 API 的服務入口僅支持 HTTP 協議。
    • 如果在阿里雲 ECS 虛擬機內使用 SDK,您還可以使用內網 Endpoint 避免公網帶寬開銷,具體請參考 服務入口
  • 阿里雲訪問秘鑰(AccessKeyId/AccessKeySecret):指定 Client 訪問日志服務時使用的訪問秘鑰
skd消費接入
原始接入
  • 參見參考文檔,要消費日志服務中的數據,請盡量不要直接使用SDK的拉數據接口,我們提供了一個高級消費庫消費組消費,該庫屏蔽了日志服務的實現細節,並且提供了負載均衡、按序消費等高級功能
消費組接入
  • 同一個消費組下面的消費者名稱必須不同,否則相同的消費者會同時消費logstore同份數據,造成數據重復

  • 協同消費庫(Consumer Library)是對日志服務中日志進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日志服務的實現細節,只需要專注於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心

  • 消費組(ConsumerGroup)

    • 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重復消費數據
  • 消費組(Consumer)

    • 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不同
  • shared消費組、消費組關系

    • 一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者
      • 每個shard只會分配到一個消費者
      • 一個消費者可以同時擁有多個shard
      • 新的消費者加入一個消費組,這個消費組下面的shard從屬關系會調整,以達到消費負載均衡的目的,但是上面的分配原則不會變,分配過程對用戶透明
  • maven
<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>

阿里雲client依賴log4j,如果項目中使用的logback,需要增加轉換log4j到logback的轉換

 <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.25</version>
</dependency>

java文件
  • main
public class Main {
  // 日志服務域名,根據實際情況填寫
  private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
  // 日志服務項目名稱,根據實際情況填寫
  private static String sProject = "ali-cn-hangzhou-sls-admin";
  // 日志庫名稱,根據實際情況填寫
  private static String sLogstore = "sls_operation_log";
  // 消費組名稱,根據實際情況填寫
  private static String sConsumerGroup = "consumerGroupX";
  // 消費數據的ak,根據實際情況填寫
  private static String sAccessKeyId = "";
  private static String sAccessKey = "";
  public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
  {
       // 第二個參數是消費者名稱,同一個消費組下面的消費者名稱必須不同,可以使用相同的消費組名稱,不同的消費者名稱在多台機器上啟動多個進程,來均衡消費一個Logstore,這個時候消費者名稱可以使用機器ip來區分。第9個參數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用默認值即可,如有調整請注意取值范圍(0,1000]
      LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
      ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread運行之后,Client Worker會自動運行,ClientWorker擴展了Runnable接口。
       thread.start();
       Thread.sleep(60 * 60 * 1000);
       //調用worker的Shutdown函數,退出消費實例,關聯的線程也會自動停止。
       worker.shutdown();
       //ClientWorker運行過程中會生成多個異步的Task,Shutdown之后最好等待還在執行的Task安全退出,建議sleep 30s。
       Thread.sleep(30 * 1000);
  }
}
  • SampleLogHubProcessor
public class SampleLogHubProcessor implements ILogHubProcessor 
{
  private int mShardId;
  // 記錄上次持久化 check point 的時間
  private long mLastCheckTime = 0; 
  public void initialize(int shardId) 
  {
      mShardId = shardId;
  }
  // 消費數據的主邏輯,這里面的所有異常都需要捕獲,不能拋出去。
  public String process(List<LogGroupData> logGroups,
          ILogHubCheckPointTracker checkPointTracker) 
  {
      // 這里簡單的將獲取到的數據打印出來
      for(LogGroupData logGroup: logGroups){
          FastLogGroup flg = logGroup.GetFastLogGroup();
          System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                  flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
          System.out.println("Tags");
          for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
              FastLogTag logtag = flg.getLogTags(tagIdx);
              System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
          }
          for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
              FastLog log = flg.getLogs(lIdx);
              System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
              for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                  FastLogContent content = log.getContents(cIdx);
                  System.out.println(content.getKey() + "\t:\t" + content.getValue());
              }
          }
      }
      long curTime = System.currentTimeMillis();
      // 每隔 30 秒,寫一次 check point 到服務端,如果 30 秒內,worker crash,
      // 新啟動的 worker 會從上一個 checkpoint 其消費數據,有可能有少量的重復數據
      if (curTime - mLastCheckTime >  30 * 1000) 
      {
          try 
          {
              //參數true表示立即將checkpoint更新到服務端,為false會將checkpoint緩存在本地,后台默認隔60s會將checkpoint刷新到服務端。
              checkPointTracker.saveCheckPoint(true);
          } 
          catch (LogHubCheckPointException e) 
          {
              e.printStackTrace();
          }
          mLastCheckTime = curTime;
      } 
      return null;  
  }
  // 當 worker 退出的時候,會調用該函數,用戶可以在此處做些清理工作。
  public void shutdown(ILogHubCheckPointTracker checkPointTracker) 
  {
      //將消費斷點保存到服務端。
      try {
          checkPointTracker.saveCheckPoint(true);
      } catch (LogHubCheckPointException e) {
          e.printStackTrace();
      }
  }
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory 
{
  public ILogHubProcessor generatorProcessor()
  {   
      // 生成一個消費實例
      return new SampleLogHubProcessor();
  }
}
  • 上述代碼,工廠類可以用lambda替換
  • client繼承Runnable,必須以thread方式啟動
  • client原理:是啟動線程,底層定時發送心跳給服務端,拿到要消費的必要信息,異步提交http請求任務(線程池),請求處理數據。所以調用client.shutdown,方法並不能立馬把所有任務關閉,最好有個時間差,同時client中運行線程標記是否關閉的變量不是線程安全的,所以關閉的時候,依然有可能提交請求任務處理
錯誤處理
  • SDK 可能出現的異常錯誤可以分成如下幾類:

    • 由日志服務端返回的錯誤。這類錯誤由日志服務端返回並由 SDK 處理。關於這類錯誤的詳細細節可以參考日志服務 API 的通用錯誤碼和各個 API 接口的具體說明。
    • 由 SDK 在向服務端發出請求時出現的網絡錯誤。這類錯誤包括網絡連接不通,服務端返回超時等。日志服務內部並未對此做任何重試邏輯,所以,您在使用 SDK 時需要自己定義相應的處理邏輯(重試請求或者直接報錯等)
    • 由 SDK 自身產生的、與平台及語言相關的錯誤,如內存溢出等。
  • 目前,各個語言 SDK 的實現都采取拋出異常的方式處理錯誤。具體原則如下:

    • 由如上第一或者第二類錯誤將會被 SDK 處理並包裝在統一的 LogException 類拋出給用戶處理
    • 由如上第三類錯誤不會被 SDK 處理,而是直接拋出平台及語言的 Native Exception 類給用戶處理
  • API錯誤重試

    • 在ILogHubProcessor的process方法中,方法返回空表示正常處理數據, 如果需要回滾到上個check point的點進行重試的話,可以return checkPointTracker.getCheckpoint(),但是這里有可能會造成重復消費
    • 自己增加重試策略,避免重復消費
參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM