本節主要內容:
1. 日志收集系統設計
2. 日志客戶端開發
1. 項目背景
a. 每個系統都有日志,當系統出現問題時,需要通過日志解決問題
b. 當系統機器比較少時,登陸到服務器上查看即可滿足
c. 當系統機器規模巨大,登陸到機器上查看幾乎不現實
2. 解決方案
a. 把機器上的日志實時收集,統一的存儲到中心系統
b. 然后再對這些日志建立索引,通過搜索即可以找到對應日志
c. 通過提供界面友好的web界面,通過web即可以完成日志搜索
3. 面臨的問題
a. 實時日志量非常大,每天幾十億條
b. 日志准實時收集,延遲控制在分鍾級別
c. 能夠水平可擴展
4. 業界方案ELK
日志收集系統架構
該方案問題:
a. 運維成本高,每增加一個日志收集,都需要手動修改配置
b. 監控缺失,無法准確獲取logstash的狀態
c. 無法做定制化開發以及維護
6. 日志收集系統設計
各組件介紹:
a. Log Agent,日志收集客戶端,用來收集服務器上的日志
b. Kafka,高吞吐量的分布式隊列,linkin開發,apache頂級開源項目
c. ES,elasticsearch,開源的搜索引擎,提供基於http restful的web接口
d. Hadoop,分布式計算框架,能夠對大量數據進行分布式處理的平台
7. kafka應用場景
1. 異步處理, 把非關鍵流程異步化,提高系統的響應時間和健壯性
2. 應用解耦,通過消息隊列
3. 流量削峰3. 流量削峰
8. zookeeper應用場景
1. 服務注冊&服務發現
2. 配置中心
3. 分布式鎖
- Zookeeper是強一致的
- 多個客戶端同時在Zookeeper上創建相同znode,只有一個創建成功
9. 安裝kafka
見博客:https://www.cnblogs.com/xuejiale/p/10505391.html
10. log agent設計
11. log agent流程
11. kafka示例
先導入第三方包:
github.com/Shopify/sarama
我的kafka和ZooKeeper都安裝在Linux(Centos6.5,ip: 192.168.30.136)上:

1 package main 2 3 import ( 4 "fmt" 5 "time" 6 "github.com/Shopify/sarama" 7 ) 8 9 func main() { 10 11 config := sarama.NewConfig() 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 config.Producer.Partitioner = sarama.NewRandomPartitioner 14 config.Producer.Return.Successes = true 15 16 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 17 if err != nil { 18 fmt.Println("producer close, err:", err) 19 return 20 } 21 22 defer client.Close() 23 for { 24 msg := &sarama.ProducerMessage{} 25 msg.Topic = "nginx_log" 26 msg.Value = sarama.StringEncoder("this is a good test, my message is good") 27 28 pid, offset, err := client.SendMessage(msg) 29 if err != nil { 30 fmt.Println("send message failed,", err) 31 return 32 } 33 34 fmt.Printf("pid:%v offset:%v\n", pid, offset) 35 time.Sleep(time.Second) 36 } 37 }
注意:Shopify/sarama的同步/異步producer,https://www.jianshu.com/p/666d2604e8f8
Windows啟動程序往Linux上的kafka發送數據:
Linux上的kafka接收數據:
再來看一個kafka生產和消費示例:

1 package main 2 3 import ( 4 "fmt" 5 "github.com/Shopify/sarama" 6 ) 7 8 func main() { 9 // 新建一個arama配置實例 10 config := sarama.NewConfig() 11 // WaitForAll waits for all in-sync replicas to commit before responding. 12 config.Producer.RequiredAcks = sarama.WaitForAll 13 // NewRandomPartitioner returns a Partitioner which chooses a random partition each time. 14 config.Producer.Partitioner = sarama.NewRandomPartitioner 15 config.Producer.Return.Successes = true 16 17 // new producer 18 client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config) 19 if err != nil { 20 fmt.Println("producer close, err:", err) 21 return 22 } 23 defer client.Close() 24 25 // new message 26 msg := &sarama.ProducerMessage{} 27 msg.Topic = "food" 28 msg.Key = sarama.StringEncoder("fruit") 29 msg.Value = sarama.StringEncoder("apple") 30 31 // send message 32 pid, offset, err := client.SendMessage(msg) 33 if err != nil { 34 fmt.Println("send message failed,", err) 35 return 36 } 37 fmt.Printf("pid: %v, offset:%v\n", pid, offset) 38 39 // new message 40 msg2 := &sarama.ProducerMessage{} 41 msg2.Topic = "food" 42 msg2.Key = sarama.StringEncoder("fruit") 43 msg2.Value = sarama.StringEncoder("orange") 44 45 // send message 46 pid2, offset2, err := client.SendMessage(msg2) 47 if err != nil { 48 fmt.Println("send message failed,", err) 49 return 50 } 51 fmt.Printf("pid2: %v, offset2:%v\n", pid2, offset2) 52 53 fmt.Println("Produce success.") 54 }

1 package main 2 3 import ( 4 "sync" 5 "github.com/Shopify/sarama" 6 "fmt" 7 ) 8 9 var wg sync.WaitGroup 10 11 func main() { 12 consumer, err := sarama.NewConsumer([]string{"192.168.30.136:9092"}, nil) 13 if err != nil { 14 fmt.Println("consumer connect error:", err) 15 return 16 } 17 fmt.Println("connnect success...") 18 defer consumer.Close() 19 20 partitions, err := consumer.Partitions("food") 21 if err != nil { 22 fmt.Println("geet partitions failed, err:", err) 23 return 24 } 25 26 for _, p := range partitions { 27 partitionConsumer, err := consumer.ConsumePartition("food", p, sarama.OffsetOldest) 28 if err != nil { 29 fmt.Println("partitionConsumer err:", err) 30 continue 31 } 32 wg.Add(1) 33 go func(){ 34 for m := range partitionConsumer.Messages() { 35 fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset) 36 } 37 wg.Done() 38 }() 39 } 40 wg.Wait() 41 42 fmt.Println("Consumer success.") 43 }
12. tailf組件使用
先導入第三方包:
github.com/hpcloud/tail

1 package main 2 3 import ( 4 "fmt" 5 "github.com/hpcloud/tail" 6 "time" 7 ) 8 func main() { 9 filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log" 10 tails, err := tail.TailFile(filename, tail.Config{ 11 ReOpen: true, 12 Follow: true, 13 //Location: &tail.SeekInfo{Offset: 0, Whence: 2}, 14 MustExist: false, 15 Poll: true, 16 }) 17 if err != nil { 18 fmt.Println("tail file err:", err) 19 return 20 } 21 var msg *tail.Line 22 var ok bool 23 for { 24 msg, ok = <-tails.Lines 25 if !ok { 26 fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename) 27 time.Sleep(100 * time.Millisecond) 28 continue 29 } 30 fmt.Println("msg:", msg) 31 } 32 }
my.log文件內容(unix格式):
在Windows上,當我的上面代碼里日志文件(my.log)為Windows格式,代碼執行結果如下:
當時用notepade++將文件格式轉換為Unix格式,執行代碼結果如下:
注意:最后一行必須有換行符,否則該行無法讀取。
13. 配置文件庫使用
先導入第三方包:
github.com/astaxie/beego/config
1) 初始化配置庫
iniconf, err := NewConfig("ini", "testini.conf") if err != nil { log.Fatal(err) }
2) 讀取配置項
String(key string) string Int(key string) (int, error) Int64(key string) (int64, error) Bool(key string) (bool, error) Float(key string) (float64, error)
例如:
iniconf.String("server::listen_ip") iniconf.Int("server::listen_port") [server] listen_ip = "0.0.0.0" listen_port = 8080 [logs] log_level=debug log_path=./logs/logagent.log [collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log

1 package main 2 3 import ( 4 "fmt" 5 "github.com/astaxie/beego/config" 6 ) 7 8 func main() { 9 conf, err := config.NewConfig("ini", "./logcollect.conf") 10 if err != nil { 11 fmt.Println("new config failed, err:", err) 12 return 13 } 14 15 port, err := conf.Int("server::listen_port") 16 if err != nil { 17 fmt.Println("read server:port failed, err:", err) 18 return 19 } 20 21 fmt.Println("Port:", port) 22 log_level := conf.String("log::log_level") 23 if err != nil { 24 fmt.Println("read log_level failed, ", err) 25 return 26 } 27 fmt.Println("log_level:", log_level) 28 29 log_path := conf.String("log::log_path") 30 fmt.Println("log_path:", log_path) 31 }
配置文件內容:
[server] listen_ip = "0.0.0.0" listen_port = 8080 [log] log_level=debug log_path=./logs/logagent.log [collect] log_path=/home/work/logs/nginx/access.log topic=nginx_log
執行結果:
14. 日志庫的使用
先導入第三方包:
github.com/astaxie/beego/logs
1) 配置log組件
config := make(map[string]interface{}) config["filename"] = "./logs/logcollect.log" config["level"] = logs.LevelDebug configStr, err := json.Marshal(config) if err != nil { fmt.Println("marshal failed, err:", err) return }
2) 初始化日志組件
logs.SetLogger(“file”, string(configStr))

1 package main 2 3 import ( 4 "encoding/json" 5 "fmt" 6 "github.com/astaxie/beego/logs" 7 ) 8 9 func main() { 10 config := make(map[string]interface{}) 11 config["filename"] = "./logcollect.log" 12 config["level"] = logs.LevelDebug 13 14 configStr, err := json.Marshal(config) 15 if err != nil { 16 fmt.Println("marshal failed, err:", err) 17 return 18 } 19 20 logs.SetLogger(logs.AdapterFile, string(configStr)) 21 22 logs.Debug("this is a test, my name is %s", "stu01") 23 logs.Trace("this is a trace, my name is %s", "stu02") 24 logs.Warn("this is a warn, my name is %s", "stu03") 25 }
15. 日志收集項目整體實現
開發環境為Windows系統,go version go1.12.1 windows/amd64, kafka_2.11-2.0.0,zookeeper-3.4.12。
先實現了一個demo,V1版本:
(1)代碼結構圖
(2)代碼地址見本人github:https://github.com/XJL635438451/logCollectProject/tree/master
(3)如何運行
1)先安裝 go, kafka,zookeeper;
2)先啟動 zookeeper,然后啟動kafka,下面是啟動的命令;
啟動ZK .\zkServer.cmd 啟動kafka F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties 創建topic F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTest 啟動生產者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kafkaTest 啟動消費者: F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic nginx_log --from-beginning
3)如果自己不行寫日志文件,可以運行代碼中的 writeLogTest/log.go,然后運行 main.exe (如果自己修改了代碼還需要重新編譯);
4)可以起一個kafka的consumer來查看日志是否寫入到了kafka,方法就是上面的啟動生產者命令,如果正常就可以看到日志一直在kafka中刷新。