分布式系統海量日志,如何獲取並進行各種分析得出實時或者非實時的分析結果
活動流數據:頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容。這種數據通常的處理方式是先把各種活動以日志的形式寫入某種文件,然后周期性地對這些文件進行統計分析。
運營數據指的是服務器的性能數據(CPU、IO使用率、請求時間、服務日志等等數據)。
常見的分布式日志收集系統:
linkedin的kafka(可以用來做消息隊列、流式處理(一般結合storm)、日志聚合等):
水平擴展和高吞吐率的消息系統,以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能,支持消息分區,分布式消費,保證每個partition內的消息順序傳輸。Scala編寫,push/pull,發布訂閱系統,producer(可以是web前端產生的Page View,或者是服務器日志,系統CPU、Memory等)向broker發送數據,即向topic發布消息,consumer訂閱每個topic,每個topic又分為多個partition(任一partition都可以被且只被一個消費者消費)便於管理和負載均衡,通過zookeeper協調。消費邏輯保存在客戶端,對消息進行分區,利用文件系統順序性提升效率,0-copy/send file系統調用提升效率,充分利用操作系統緩存,減少系統內用戶區和系統區的復制。
每條消息都被append到該Partition中,屬於順序寫磁盤,因此效率非常高,順序寫磁盤效率比隨機寫內存還要高
每個partition內的消息是有序的,而一個partition只能被一個消費者消費,因此Kafka能提供partition層面的消息有序,而傳統的隊列在多個consumer的情況下是完全無法保證有序的。
物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的所有消息和索引文件。
現代操作系統都樂於將空閑內存轉作磁盤緩存(頁面緩存),想不用都難;對於這樣的系統,他的數據在內存中保存了一份,同時也在OS的頁面緩存中保存了一份,這樣不但多了一個步驟還讓內存的使用率下降了一半;因此,Kafka決定直接使用頁面緩存;但是隨機寫入的效率很慢,為了維護彼此的關系順序還需要額外的操作和存儲,而線性的寫入可以避免這些,實際上,線性寫入(linear write)的速度大約是300MB/秒,但隨即寫入卻只有50k/秒,其中的差別接近10000倍。這樣,Kafka以頁面緩存為中間的設計在保證效率的同時還提供了消息的持久化,每個消費者自己維護當前讀取數據的offser(也可委托給zookeeper),以此可同時支持在線和離線的消費。
Kafka集群會保留所有的消息,無論其被消費與否,兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一周前的數據,也可在Partition文件超過1GB時刪除舊數據。
Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關
每一個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常情況下Consumer會在消費完一條消息后遞增該offset。當然,Consumer也可將offset設成一個較小的值,重新消費一些消息。因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些消息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障。
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪一個Partition。如果Partition機制設置合理,所有消息可以均勻分布到不同的Partition里,這樣就實現了負載均衡。如果一個Topic對應一個文件,那這個文件所在的機器I/O將會成為這個Topic的性能瓶頸,而有了Partition后,不同的消息可以並行寫入不同broker的不同Partition里,極大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量,也可在創建Topic時通過參數指定,同時也可以在Topic創建之后通過Kafka提供的工具修改。
在發送一條消息時,可以指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪個Parition。Paritition機制可以通過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中如果key可以被解析為整數則將對應的整數與Partition總數取余,該消息會被發送到該數對應的Partition。(每個Parition都會有個序號,序號從0開始)
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
Kafka默認保證At least once,並且允許通過設置Producer異步提交來實現At most once。而Exactly once要求與外部存儲系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式。
cloudera的flume:
Flume采用了分層架構,由三層組成,分別為agent,collector和storage。其中Agent層每個機器部署一個進程,負責對單機的日志收集工作;Collector層部署在中心服務器上,負責接收Agent層發送的日志,並且將日志根據路由規則寫到相應的Store層中;Store層負責提供永久或者臨時的日志存儲服務,或者將日志流導向其它服務器
當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到數據
agent首先將event寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。),Store on failure(這也是scribe采用的策略,當數據接收方crash時,將數據寫到本地,待恢復后,繼續發送),Best effort(數據發送到接收方后,不會進行確認)。Flume采用了三層架構,分別問agent,collector和storage,每一層均可以水平擴展。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。
Source:
從數據發生器接收數據,並將接收的數據以Flume的event格式傳遞給一個或者多個通道channal,Flume提供多種數據接收的方式,比如Avro,Thrift,twitter等
Channel:
channal是一種短暫的存儲容器,它將從source處接收到的event格式的數據緩存起來,直到它們被sinks消費掉,它在source和sink間起着一共橋梁的作用,channal是一個完整的事務,這一點保證了數據在收發的時候的一致性. 並且它可以和任意數量的source和sink鏈接. 支持的類型有: JDBC channel , File System channel , Memort channel等.
sink:
sink將數據存儲到集中存儲器比如Hbase和HDFS,它從channals消費數據(events)並將其傳遞給目標地. 目標地可能是另一個sink,也可能HDFS,HBase.
美團的日志收集系統的模塊分解圖,詳解Agent, Collector和Bypass中的Source, Channel和Sink的關系
參考自 http://www.aboutyun.com/thread-8317-1-1.html
日志監控時如果機器死掉,則日志也不產生了,無影響,如果是agent進程死掉,則可以監控設置重啟,或日志持久化磁盤,重啟后重讀
中心服務器提供的是對等的且無差別的服務,且Agent訪問Collector做了LoadBalance和重試機制。所以當某個Collector無法提供服務時,Agent的重試策略會將數據發送到其它可用的Collector上面。所以整個服務不受影響。
Target可以配置雙目標,當hdfs不可用則臨時用FileChannel緩存,如果文件緩存太慢,則利用MemChannel傳遞數據減少延時
所有的events都被保存在Agent的Channel中,然后被發送到數據流中的下一個Agent或者最終的存儲服務中。那么一個Agent的Channel中的events什么時候被刪除呢?當且僅當它們被保存到下一個Agent的Channel中或者被保存到最終的存儲服務中。這就是Flume提供數據流中點到點的可靠性保證的最基本的單跳消息傳遞語義。Agent間的事務交換。Flume使用事務的辦法來保證event的可靠傳遞。Source和Sink分別被封裝在事務中,這些事務由保存event的存儲提供或者由Channel提供。這就保證了event在數據流的點對點傳輸中是可靠的。
facebook的scribe(epoll方式):
從各種日志源上收集數據,存儲到中央系統,集中統計分析處理,容錯性好。當后端的存儲系統crash時,scribe會將數據寫到本地磁盤上,當存儲系統恢復正常后,scribe將日志重新加載到存儲系統中
Agent是thrift client,向scribe發送數據用,Scribe接受agent發來的數據,根據配置將不同topic的數據發送給不同的對象,進行各種存儲
apache的chukwa:
Adapter這里沒有,封裝其他數據源,文件命令 系統參數 hadoop log等,HDFS初衷是支持大文件和小並發高速寫,日志系統是高並發低速寫和大量小文件存儲,所以增加collector和agent,agent給adapter提供各種服務,包括啟動關閉adapter,將數據通過HTTP傳遞給Collector,定期記錄adapter的狀態便於故障恢復。collector對多個數據源發來的數據進行合並,加載到HDFS,隱藏HDFS細節。
Splunk:
好用,貴
淘寶Time Tunnel:
基於thrift通訊框架搭建的實時數據傳輸平台,具有高性能、實時性、順序性、高可靠性、高可用性、可擴展性等特點(基於Hbase)
logstash:
ELK(ElasticSearch, LogStash, Kibana)
input 數據輸入端,可以接收來自任何地方的源數據。
Filter 數據中轉層,主要進行格式處理,數據類型轉換、數據過濾、字段添加,修改等。
output 是logstash工作的最后一個階段,負責將數據輸出到指定位置,兼容大多數應用。
參考:
http://www.mincoder.com/article/3942.shtml