Go語言學習之11 日志收集系統kafka庫實戰


本節主要內容:

1. 日志收集系統設計
2. 日志客戶端開發

1. 項目背景
    a. 每個系統都有日志,當系統出現問題時,需要通過日志解決問題
    b. 當系統機器比較少時,登陸到服務器上查看即可滿足
    c. 當系統機器規模巨大,登陸到機器上查看幾乎不現實

2. 解決方案
    a. 把機器上的日志實時收集,統一的存儲到中心系統
    b. 然后再對這些日志建立索引,通過搜索即可以找到對應日志
    c. 通過提供界面友好的web界面,通過web即可以完成日志搜索

3. 面臨的問題
    a. 實時日志量非常大,每天幾十億條
    b. 日志准實時收集,延遲控制在分鍾級別
    c. 能夠水平可擴展

4. 業界方案ELK

    日志收集系統架構

    該方案問題:

    a. 運維成本高,每增加一個日志收集,都需要手動修改配置
    b. 監控缺失,無法准確獲取logstash的狀態
    c. 無法做定制化開發以及維護

6. 日志收集系統設計

 

    各組件介紹:
    a. Log Agent,日志收集客戶端,用來收集服務器上的日志
    b. Kafka,高吞吐量的分布式隊列,linkin開發,apache頂級開源項目
    c. ES,elasticsearch,開源的搜索引擎,提供基於http restful的web接口
    d. Hadoop,分布式計算框架,能夠對大量數據進行分布式處理的平台

7. kafka應用場景
    1. 異步處理, 把非關鍵流程異步化,提高系統的響應時間和健壯性

 

     2. 應用解耦,通過消息隊列

    3. 流量削峰3. 流量削峰

 

 8. zookeeper應用場景

     1. 服務注冊&服務發現

 

 

     2. 配置中心

    3. 分布式鎖

  • Zookeeper是強一致的
  • 多個客戶端同時在Zookeeper上創建相同znode,只有一個創建成功

 9. 安裝kafka

     見博客:https://www.cnblogs.com/xuejiale/p/10505391.html

10. log agent設計

11. log agent流程

 

11. kafka示例

      先導入第三方包:

github.com/Shopify/sarama

     我的kafka和ZooKeeper都安裝在Linux(Centos6.5,ip: 192.168.30.136)上:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "time"
 6     "github.com/Shopify/sarama"
 7 )
 8 
 9 func main() {
10 
11     config := sarama.NewConfig()
12     config.Producer.RequiredAcks = sarama.WaitForAll
13     config.Producer.Partitioner = sarama.NewRandomPartitioner
14     config.Producer.Return.Successes = true
15 
16     client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config)
17     if err != nil {
18         fmt.Println("producer close, err:", err)
19         return
20     }
21 
22     defer client.Close()
23     for {
24         msg := &sarama.ProducerMessage{}
25         msg.Topic = "nginx_log"
26         msg.Value = sarama.StringEncoder("this is a good test, my message is good")
27 
28         pid, offset, err := client.SendMessage(msg)
29         if err != nil {
30             fmt.Println("send message failed,", err)
31             return
32         }
33 
34         fmt.Printf("pid:%v offset:%v\n", pid, offset)
35         time.Sleep(time.Second)
36     }
37 }
kafka示例

   注意:Shopify/sarama的同步/異步producer,https://www.jianshu.com/p/666d2604e8f8

    Windows啟動程序往Linux上的kafka發送數據:

    Linux上的kafka接收數據:

  再來看一個kafka生產和消費示例:

 1 package main
 2 
 3 import (
 4     "fmt"
 5     "github.com/Shopify/sarama"
 6 )
 7 
 8 func main() {
 9     // 新建一個arama配置實例
10     config := sarama.NewConfig()
11     // WaitForAll waits for all in-sync replicas to commit before responding.
12     config.Producer.RequiredAcks = sarama.WaitForAll
13     // NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
14     config.Producer.Partitioner = sarama.NewRandomPartitioner
15     config.Producer.Return.Successes = true
16 
17     // new producer
18     client, err := sarama.NewSyncProducer([]string{"192.168.30.136:9092"}, config)
19     if err != nil {
20         fmt.Println("producer close, err:", err)
21         return
22     }
23     defer client.Close()
24 
25     // new message
26     msg := &sarama.ProducerMessage{}
27     msg.Topic = "food"
28     msg.Key = sarama.StringEncoder("fruit")
29     msg.Value = sarama.StringEncoder("apple")
30 
31     // send message
32     pid, offset, err := client.SendMessage(msg)
33     if err != nil {
34         fmt.Println("send message failed,", err)
35         return
36     }
37     fmt.Printf("pid: %v, offset:%v\n", pid, offset)
38 
39     // new message
40     msg2 := &sarama.ProducerMessage{}
41     msg2.Topic = "food"
42     msg2.Key = sarama.StringEncoder("fruit")
43     msg2.Value = sarama.StringEncoder("orange")
44 
45     // send message
46     pid2, offset2, err := client.SendMessage(msg2)
47     if err != nil {
48         fmt.Println("send message failed,", err)
49         return
50     }
51     fmt.Printf("pid2: %v, offset2:%v\n", pid2, offset2)
52 
53     fmt.Println("Produce success.")
54 }
produce
 1 package main
 2 
 3 import (
 4     "sync"
 5     "github.com/Shopify/sarama"
 6     "fmt"
 7 )
 8 
 9 var wg sync.WaitGroup
10 
11 func main() {
12     consumer, err := sarama.NewConsumer([]string{"192.168.30.136:9092"}, nil)
13     if err != nil {
14         fmt.Println("consumer connect error:", err)
15         return
16     }
17     fmt.Println("connnect success...")
18     defer consumer.Close()
19 
20     partitions, err := consumer.Partitions("food")
21     if err != nil {
22         fmt.Println("geet partitions failed, err:", err)
23         return
24     }
25 
26     for _, p := range partitions {
27         partitionConsumer, err := consumer.ConsumePartition("food", p, sarama.OffsetOldest)
28         if err != nil {
29             fmt.Println("partitionConsumer err:", err)
30             continue
31         }
32         wg.Add(1)
33         go func(){
34             for m := range partitionConsumer.Messages() {
35                 fmt.Printf("key: %s, text: %s, offset: %d\n", string(m.Key), string(m.Value), m.Offset)
36             }
37             wg.Done()
38         }()
39     }
40     wg.Wait()
41 
42     fmt.Println("Consumer success.")
43 }
consumer

12. tailf組件使用

    先導入第三方包:

github.com/hpcloud/tail
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "github.com/hpcloud/tail"
 6     "time"
 7 )
 8 func main() {
 9     filename := "F:\\Go\\project\\src\\go_dev\\logCollect\\tailf\\my.log"
10     tails, err := tail.TailFile(filename, tail.Config{
11         ReOpen:    true,
12         Follow:    true,
13         //Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
14         MustExist: false,
15         Poll:      true,
16     })
17     if err != nil {
18         fmt.Println("tail file err:", err)
19         return
20     }
21     var msg *tail.Line
22     var ok bool
23     for {
24         msg, ok = <-tails.Lines
25         if !ok {
26             fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
27             time.Sleep(100 * time.Millisecond)
28             continue
29         }
30         fmt.Println("msg:", msg)
31     }
32 }
tailf示例代碼

    my.log文件內容(unix格式):

    在Windows上,當我的上面代碼里日志文件(my.log)為Windows格式,代碼執行結果如下:

    當時用notepade++將文件格式轉換為Unix格式,執行代碼結果如下:

    注意:最后一行必須有換行符,否則該行無法讀取。

13. 配置文件庫使用

    先導入第三方包:

github.com/astaxie/beego/config

    1) 初始化配置庫

iniconf, err := NewConfig("ini", "testini.conf")
if err != nil {
    log.Fatal(err)
}

    2) 讀取配置項

String(key string) string
Int(key string) (int, error)
Int64(key string) (int64, error)
Bool(key string) (bool, error) 
Float(key string) (float64, error)

   例如:

iniconf.String("server::listen_ip")
iniconf.Int("server::listen_port")

[server]
listen_ip = "0.0.0.0"
listen_port = 8080

[logs]
log_level=debug
log_path=./logs/logagent.log

[collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log
 1 package main
 2 
 3 import (
 4     "fmt"
 5     "github.com/astaxie/beego/config"
 6 )
 7 
 8 func main() {
 9     conf, err := config.NewConfig("ini", "./logcollect.conf")
10     if err != nil {
11         fmt.Println("new config failed, err:", err)
12         return
13     }
14 
15     port, err := conf.Int("server::listen_port")
16     if err != nil {
17         fmt.Println("read server:port failed, err:", err)
18         return
19     }
20 
21     fmt.Println("Port:", port)
22     log_level := conf.String("log::log_level")
23     if err != nil {
24         fmt.Println("read log_level failed, ", err)
25         return
26     }
27     fmt.Println("log_level:", log_level)
28 
29     log_path := conf.String("log::log_path")
30     fmt.Println("log_path:", log_path)
31 }
config示例代碼

    配置文件內容:

[server]
listen_ip = "0.0.0.0"
listen_port = 8080

[log]
log_level=debug
log_path=./logs/logagent.log

[collect]
log_path=/home/work/logs/nginx/access.log
topic=nginx_log

    執行結果:

14. 日志庫的使用

        先導入第三方包:

github.com/astaxie/beego/logs

    1) 配置log組件

config := make(map[string]interface{})
config["filename"] = "./logs/logcollect.log"
config["level"] = logs.LevelDebug

configStr, err := json.Marshal(config)
if err != nil {
    fmt.Println("marshal failed, err:", err)
    return
}

    2) 初始化日志組件

logs.SetLogger(“file”, string(configStr))
 1 package main
 2 
 3 import (
 4     "encoding/json"
 5     "fmt"
 6     "github.com/astaxie/beego/logs"
 7 )
 8 
 9 func main() {
10     config := make(map[string]interface{})
11     config["filename"] = "./logcollect.log"
12     config["level"] = logs.LevelDebug
13 
14     configStr, err := json.Marshal(config)
15     if err != nil {
16         fmt.Println("marshal failed, err:", err)
17         return
18     }
19 
20     logs.SetLogger(logs.AdapterFile, string(configStr))
21 
22     logs.Debug("this is a test, my name is %s", "stu01")
23     logs.Trace("this is a trace, my name is %s", "stu02")
24     logs.Warn("this is a warn, my name is %s", "stu03")
25 }
logs示例

 15. 日志收集項目整體實現

    開發環境為Windows系統,go version go1.12.1 windows/amd64, kafka_2.11-2.0.0,zookeeper-3.4.12。

   先實現了一個demo,V1版本:

(1)代碼結構圖

 

(2)代碼地址見本人github:https://github.com/XJL635438451/logCollectProject/tree/master

(3)如何運行

    1)先安裝 go, kafka,zookeeper;

    2)先啟動 zookeeper,然后啟動kafka,下面是啟動的命令;

啟動ZK
.\zkServer.cmd

啟動kafka
F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-server-start.bat .\config\server.properties

創建topic
F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTest

啟動生產者:
F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic kafkaTest

啟動消費者:
F:\Go\project\src\module\kafka_2.11-2.0.0>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic nginx_log  --from-beginning

    3)如果自己不行寫日志文件,可以運行代碼中的 writeLogTest/log.go,然后運行 main.exe (如果自己修改了代碼還需要重新編譯);

    4)可以起一個kafka的consumer來查看日志是否寫入到了kafka,方法就是上面的啟動生產者命令,如果正常就可以看到日志一直在kafka中刷新。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM