此文檔只關心消費接入,不關心日志接入,只關心消費如何接入,可直接跳轉到【sdk消費接入]
SLS簡介
- 日志服務:
- 日志服務(Log Service,簡稱 LOG)是針對日志類數據的一站式服務,在阿里巴巴集團經歷大量大數據場景錘煉而成。您無需開發就能快捷完成日志數據采集、消費、投遞以及查詢分析等功能,提升運維、運營效率,建立 DT 時代海量日志處理能力
- 功能
- 實時采集與消費(LogHub)
- 投遞數倉(LogShipper)
- 查詢與實時分析(Search/Analytics)
接入消費流程
賬號問題
- 如果消費自己的日志,直接使用自己阿里雲賬號的key
- 如果消費他人提供的日志,需要別人創建的子賬號或者賬號(推薦子賬號,無安全問題)中的key,使用自己賬號無法連接通
接入點EndPoint
- 公有雲(彈外)
- 公有雲(彈外)入口參見列表
消費接入(java)
概念
對象 | 明細 |
---|---|
Log | 日志、日志組表示等基本概念 |
Project | 項目 |
Config | 配置 |
LogStore | 日志庫 |
Index | 索引 |
Shard | 分區 |
ConsumerGroup | 消費組 |
配置
就如同使用 API 和日志服務服務端交互一樣,使用 SDK 也需要指定一些基本配置。目前,所有語言的 SDK 都定義了一個 Client 類作為入口類,這些基本配置信息在該入口類的構造時指定。
具體包括如下幾項:
- 服務入口(Endpoint):確認 Client 需要訪問的服務入口
- 當使用 SDK 時,首先需要明確訪問的日志服務 Project 所在 Region(如“華東 1 (杭州)”、“華北 1 (青島)”等),然后選擇與其匹配的日志服務入口初始化 Client。該服務入口與 API 中的 服務入口 定義一致
- 當選擇 Client 的 Endpoint 時,必須要保證您需要訪問的 Project 的 Region 和 Endpoint 對應的 Region 一致,否則 SDK 將無法訪問您指定的 Project
- 由於 Client 實例只能在構造時指定該服務入口,如果需要訪問不同 Region 里的 Project,則需要用不同的 Endpoint 構建不同的 Client 實例
- 目前,所有 API 的服務入口僅支持 HTTP 協議。
- 如果在阿里雲 ECS 虛擬機內使用 SDK,您還可以使用內網 Endpoint 避免公網帶寬開銷,具體請參考 服務入口
- 阿里雲訪問秘鑰(AccessKeyId/AccessKeySecret):指定 Client 訪問日志服務時使用的訪問秘鑰
skd消費接入
原始接入
- 參見參考文檔,要消費日志服務中的數據,請盡量不要直接使用SDK的拉數據接口,我們提供了一個高級消費庫消費組消費,該庫屏蔽了日志服務的實現細節,並且提供了負載均衡、按序消費等高級功能
消費組接入
-
同一個消費組下面的消費者名稱必須不同,否則相同的消費者會同時消費logstore同份數據,造成數據重復
-
協同消費庫(Consumer Library)是對日志服務中日志進行消費的高級模式,提供了消費組(ConsumerGroup)的概念對消費端進行抽象和管理,和直接使用SDK進行數據讀取的區別在於,用戶無需關心日志服務的實現細節,只需要專注於業務邏輯,另外,消費者之間的負載均衡、failover等用戶也都無需關心
-
消費組(ConsumerGroup)
- 一個消費組由多個消費者構成,同一個消費組下面的消費者共同消費一個logstore中的數據,消費者之間不會重復消費數據
-
消費組(Consumer)
- 消費組的構成單元,實際承擔消費任務,同一個消費組下面的消費者名稱必須不同
-
shared消費組、消費組關系
- 一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者
- 每個shard只會分配到一個消費者
- 一個消費者可以同時擁有多個shard
- 新的消費者加入一個消費組,這個消費組下面的shard從屬關系會調整,以達到消費負載均衡的目的,但是上面的分配原則不會變,分配過程對用戶透明
- 一個logstore下面會有多個shard,協同消費庫的功能就是將shard分配給一個消費組下面的消費者
-
maven
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.15</version>
</dependency>
阿里雲client依賴log4j,如果項目中使用的logback,需要增加轉換log4j到logback的轉換
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.25</version>
</dependency>
java文件
- main
public class Main {
// 日志服務域名,根據實際情況填寫
private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
// 日志服務項目名稱,根據實際情況填寫
private static String sProject = "ali-cn-hangzhou-sls-admin";
// 日志庫名稱,根據實際情況填寫
private static String sLogstore = "sls_operation_log";
// 消費組名稱,根據實際情況填寫
private static String sConsumerGroup = "consumerGroupX";
// 消費數據的ak,根據實際情況填寫
private static String sAccessKeyId = "";
private static String sAccessKey = "";
public static void main(String []args) throws LogHubClientWorkerException, InterruptedException
{
// 第二個參數是消費者名稱,同一個消費組下面的消費者名稱必須不同,可以使用相同的消費組名稱,不同的消費者名稱在多台機器上啟動多個進程,來均衡消費一個Logstore,這個時候消費者名稱可以使用機器ip來區分。第9個參數(maxFetchLogGroupSize)是每次從服務端獲取的LogGroup數目,使用默認值即可,如有調整請注意取值范圍(0,1000]
LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
//Thread運行之后,Client Worker會自動運行,ClientWorker擴展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
//調用worker的Shutdown函數,退出消費實例,關聯的線程也會自動停止。
worker.shutdown();
//ClientWorker運行過程中會生成多個異步的Task,Shutdown之后最好等待還在執行的Task安全退出,建議sleep 30s。
Thread.sleep(30 * 1000);
}
}
- SampleLogHubProcessor
public class SampleLogHubProcessor implements ILogHubProcessor
{
private int mShardId;
// 記錄上次持久化 check point 的時間
private long mLastCheckTime = 0;
public void initialize(int shardId)
{
mShardId = shardId;
}
// 消費數據的主邏輯,這里面的所有異常都需要捕獲,不能拋出去。
public String process(List<LogGroupData> logGroups,
ILogHubCheckPointTracker checkPointTracker)
{
// 這里簡單的將獲取到的數據打印出來
for(LogGroupData logGroup: logGroups){
FastLogGroup flg = logGroup.GetFastLogGroup();
System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
System.out.println("Tags");
for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
FastLogTag logtag = flg.getLogTags(tagIdx);
System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
}
for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
FastLog log = flg.getLogs(lIdx);
System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
FastLogContent content = log.getContents(cIdx);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
}
}
}
long curTime = System.currentTimeMillis();
// 每隔 30 秒,寫一次 check point 到服務端,如果 30 秒內,worker crash,
// 新啟動的 worker 會從上一個 checkpoint 其消費數據,有可能有少量的重復數據
if (curTime - mLastCheckTime > 30 * 1000)
{
try
{
//參數true表示立即將checkpoint更新到服務端,為false會將checkpoint緩存在本地,后台默認隔60s會將checkpoint刷新到服務端。
checkPointTracker.saveCheckPoint(true);
}
catch (LogHubCheckPointException e)
{
e.printStackTrace();
}
mLastCheckTime = curTime;
}
return null;
}
// 當 worker 退出的時候,會調用該函數,用戶可以在此處做些清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker)
{
//將消費斷點保存到服務端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
}
}
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory
{
public ILogHubProcessor generatorProcessor()
{
// 生成一個消費實例
return new SampleLogHubProcessor();
}
}
- 上述代碼,工廠類可以用lambda替換
- client繼承Runnable,必須以thread方式啟動
- client原理:是啟動線程,底層定時發送心跳給服務端,拿到要消費的必要信息,異步提交http請求任務(線程池),請求處理數據。所以調用client.shutdown,方法並不能立馬把所有任務關閉,最好有個時間差,同時client中運行線程標記是否關閉的變量不是線程安全的,所以關閉的時候,依然有可能提交請求任務處理
錯誤處理
-
SDK 可能出現的異常錯誤可以分成如下幾類:
- 由日志服務端返回的錯誤。這類錯誤由日志服務端返回並由 SDK 處理。關於這類錯誤的詳細細節可以參考日志服務 API 的通用錯誤碼和各個 API 接口的具體說明。
- 由 SDK 在向服務端發出請求時出現的網絡錯誤。這類錯誤包括網絡連接不通,服務端返回超時等。日志服務內部並未對此做任何重試邏輯,所以,您在使用 SDK 時需要自己定義相應的處理邏輯(重試請求或者直接報錯等)
- 由 SDK 自身產生的、與平台及語言相關的錯誤,如內存溢出等。
-
目前,各個語言 SDK 的實現都采取拋出異常的方式處理錯誤。具體原則如下:
- 由如上第一或者第二類錯誤將會被 SDK 處理並包裝在統一的 LogException 類拋出給用戶處理
- 由如上第三類錯誤不會被 SDK 處理,而是直接拋出平台及語言的 Native Exception 類給用戶處理
-
API錯誤重試
- 在ILogHubProcessor的process方法中,方法返回空表示正常處理數據, 如果需要回滾到上個check point的點進行重試的話,可以return checkPointTracker.getCheckpoint(),但是這里有可能會造成重復消費
- 自己增加重試策略,避免重復消費