日志收集系統架構
1.項目背景
a. 每個系統都有日志,當系統出現問題時,需要通過日志解決問題
b. 當系統機器比較少時,登陸到服務器上查看即可滿足
c. 當系統機器規模巨大,登陸到機器上查看幾乎不現實
2.解決方案
a. 把機器上的日志實時收集,統一的存儲到中心系統
b. 然后再對這些日志建立索引,通過搜索即可以找到對應日志
c. 通過提供界面友好的web界面,通過web即可以完成日志搜索
面臨的問題
a. 實時日志量非常大,每天幾十億條
b. 日志准實時收集,延遲控制在分鍾級別
c. 能夠水平可擴展
ELK介紹
•官網https://www.elastic.co/cn/
• 中文指南https://www.gitbook.com/book/chenryn/elk-stack-guide-cn/details
• ELKStack (5.0版本之后)--> ElasticStack == (ELKStack + Beats)
• ELK Stack包含:ElasticSearch、Logstash、Kibana
• ElasticSearch是一個搜索引擎,用來搜索、分析、存儲日志。它是分布式的,也就是說可以橫向擴容,可以自動發現,索引自動分片,總之很強大。文檔https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html
• Logstash用來采集日志,把日志解析為json格式交給ElasticSearch。
• Kibana是一個數據可視化組件,把處理后的結果通過web界面展示
• Beats在這里是一個輕量級日志采集器,其實Beats家族有5個成員
• 早期的ELK架構中使用Logstash收集、解析日志,但是Logstash對內存、cpu、io等資源消耗比較高。相比 Logstash,Beats所占系統的CPU和內存幾乎可以忽略不計
• x-pack對ElasticStack提供了安全、警報、監控、報表、圖表於一身的擴展包,是收費的
elk方案問題
a. 運維成本高,每增加一個日志收集,都需要手動修改配置
b. 監控缺失,無法准確獲取logstash的狀態
c. 無法做定制化開發以及維護
日志收集系統設計
Kafka消息隊列
數據解耦
a. Log Agent,日志收集客戶端,用來收集服務器上的日志
b. Kafka,高吞吐量的分布式隊列,linkin開發,apache頂級開源項目
c. ES,elasticsearch,開源的搜索引擎,提供基於http restful的web接口
d. Hadoop,分布式計算框架,能夠對大量數據進行分布式處理的平台
zookeeper
Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統的一致性問題,它能提供基於類似於文件系統的目錄節點樹方式的數據存儲, Zookeeper 作用主要是用來維護和監控存儲的數據的狀態變化,通過監控這些數據狀態的變化,從而達到基於數據的集群管理
簡單的說,zookeeper=文件系統+通知機制
a. 安裝JDK,從oracle下載最新的SDK安裝
b. 安裝zookeeper3.3.6,下載地址:http://apache.fayea.com/zookeeper/
1)mv conf/zoo_sample.cfg conf/zoo.cfg
2)編輯 conf/zoo.cfg,修改dataDir
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper/data
# the port at which the clients will connect
clientPort=2181
dataLogDir=/tmp/zookeeper/log
3)vim /etc/profile
export PATH=$PATH:/usr/local/zookeeper/bin
source /etc/profile
運行:
[root@greg02 zookeeper]#zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
kafka
1.打開鏈接:http://kafka.apache.org/downloads.html
下載https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.2/kafka_2.12-0.11.0.2.tgz
2.打開config目錄下的server.properties, 修改log.dirs為D:\kafka_logs,修改advertised.host.name=服務器ip
3.啟動kafka
[root@greg02 kafka]#kafka-server-start.sh config/server.properties
kafka消費者開啟
[root@greg02 kafka]#kafka-console-consumer.sh --topic nginx_log --zookeeper 127.0.0.1 2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2018-02-05 18:30:22,451] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
[2018-02-05 18:30:22,597] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
go kafka
package main
import (
"fmt"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
client, err := sarama.NewSyncProducer([]string{"192.168.179.130:9092"}, config)
if err != nil {
fmt.Println("producer close, err:", err)
return
}
defer client.Close()
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test, my message is good")
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
time.Sleep(10 * time.Millisecond)
}
linux tail命令
-f 用於循環讀取文件的內容,監視文件的增長
-F 與-f類似,區別在於當將監視的文件刪除重建后-F仍能監視該文件內容-f則不行,-F有重試的功能,會不斷重試
package main
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
func main() {
filename := "/root/passwd"
tails, err := tail.TailFile(filename, tail.Config{
ReOpen: true,
Follow: true,
//Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
})
if err != nil {
fmt.Println("tail file err:", err)
return
}
var msg *tail.Line
var ok bool
for true {
msg, ok = <-tails.Lines
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(100 * time.Millisecond)
continue
}
fmt.Println("msg:", msg)
}
}
配置文件庫使用
-
初始化配置庫
iniconf, err := NewConfig("ini", "testini.conf") if err != nil { t.Fatal(err) }
-
讀取配置項
• String(key string) string
• Int(key string) (int, error)
• Int64(key string) (int64, error)
• Bool(key string) (bool, error)
• Float(key string) (float64, error)
cofig的go實現
package main
import (
"fmt"
"github.com/astaxie/beego/config"
)
func main() {
conf, err := config.NewConfig("ini", "./logagent.conf")
if err != nil {
fmt.Println("new config failed, err:", err)
return
}
port, err := conf.Int("server::port")
if err != nil {
fmt.Println("read server:port failed, err:", err)
return
}
fmt.Println("Port:", port)
log_level := conf.String("logs::log_level")
if len(log_level) == 0 {
log_level = "debug"
}
fmt.Println("log_level:", log_level)
log_path := conf.String("logs::log_path")
fmt.Println("log_path:", log_path)
}
日志庫的使用
-
配置log組件
config := make(map[string]interface{}) config["filename"] = "./logs/logcollect.log" config["level"] = logs.LevelDebug configStr, err := json.Marshal(config) if err != nil { fmt.Println("marshal failed, err:", err) return }
-
初始化日志組件
logs.SetLogger(“file”, string(configStr))
寫日志
package main import ( "encoding/json" "fmt" "github.com/astaxie/beego/logs" ) func main() { config := make(map[string]interface{}) config["filename"] = "/root/logs/logcollect.log" config["level"] = logs.LevelDebug configStr, err := json.Marshal(config) if err != nil { fmt.Println("marshal failed, err:", err) return } logs.SetLogger(logs.AdapterFile, string(configStr)) logs.Debug("this is a test, my name is %s", "stu01") logs.Trace("this is a trace, my name is %s", "stu02") logs.Warn("this is a warn, my name is %s", "stu03") }