GO開發:用go寫個日志監控系統


日志收集系統架構

1.項目背景

a. 每個系統都有日志,當系統出現問題時,需要通過日志解決問題

b. 當系統機器比較少時,登陸到服務器上查看即可滿足

c. 當系統機器規模巨大,登陸到機器上查看幾乎不現實

2.解決方案

a. 把機器上的日志實時收集,統一的存儲到中心系統

b. 然后再對這些日志建立索引,通過搜索即可以找到對應日志

c. 通過提供界面友好的web界面,通過web即可以完成日志搜索

面臨的問題

a. 實時日志量非常大,每天幾十億條

b. 日志准實時收集,延遲控制在分鍾級別

c. 能夠水平可擴展

ELK介紹

•官網https://www.elastic.co/cn/

• 中文指南https://www.gitbook.com/book/chenryn/elk-stack-guide-cn/details

• ELKStack (5.0版本之后)--> ElasticStack == (ELKStack + Beats)

• ELK Stack包含:ElasticSearch、Logstash、Kibana

• ElasticSearch是一個搜索引擎,用來搜索、分析、存儲日志。它是分布式的,也就是說可以橫向擴容,可以自動發現,索引自動分片,總之很強大。文檔https://www.elastic.co/guide/cn/elasticsearch/guide/current/index.html

• Logstash用來采集日志,把日志解析為json格式交給ElasticSearch。

• Kibana是一個數據可視化組件,把處理后的結果通過web界面展示

• Beats在這里是一個輕量級日志采集器,其實Beats家族有5個成員

• 早期的ELK架構中使用Logstash收集、解析日志,但是Logstash對內存、cpu、io等資源消耗比較高。相比 Logstash,Beats所占系統的CPU和內存幾乎可以忽略不計

• x-pack對ElasticStack提供了安全、警報、監控、報表、圖表於一身的擴展包,是收費的

elk

elk方案問題

a. 運維成本高,每增加一個日志收集,都需要手動修改配置

b. 監控缺失,無法准確獲取logstash的狀態

c. 無法做定制化開發以及維護

日志收集系統設計

kafka

Kafka消息隊列

數據解耦

a. Log Agent,日志收集客戶端,用來收集服務器上的日志

b. Kafka,高吞吐量的分布式隊列,linkin開發,apache頂級開源項目

c. ES,elasticsearch,開源的搜索引擎,提供基於http restful的web接口

d. Hadoop,分布式計算框架,能夠對大量數據進行分布式處理的平台

zookeeper

Zookeeper 作為一個分布式的服務框架,主要用來解決分布式集群中應用系統的一致性問題,它能提供基於類似於文件系統的目錄節點樹方式的數據存儲, Zookeeper 作用主要是用來維護和監控存儲的數據的狀態變化,通過監控這些數據狀態的變化,從而達到基於數據的集群管理

簡單的說,zookeeper=文件系統+通知機制

a. 安裝JDK,從oracle下載最新的SDK安裝

b. 安裝zookeeper3.3.6,下載地址:http://apache.fayea.com/zookeeper/

1)mv conf/zoo_sample.cfg conf/zoo.cfg

2)編輯 conf/zoo.cfg,修改dataDir

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper/data
# the port at which the clients will connect
clientPort=2181
dataLogDir=/tmp/zookeeper/log

3)vim /etc/profile

export PATH=$PATH:/usr/local/zookeeper/bin

source /etc/profile

運行:

[root@greg02 zookeeper]#zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

kafka

1.打開鏈接:http://kafka.apache.org/downloads.html

下載https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.2/kafka_2.12-0.11.0.2.tgz

2.打開config目錄下的server.properties, 修改log.dirs為D:\kafka_logs,修改advertised.host.name=服務器ip

3.啟動kafka

[root@greg02 kafka]#kafka-server-start.sh config/server.properties 

kafka消費者開啟

[root@greg02 kafka]#kafka-console-consumer.sh --topic nginx_log --zookeeper 127.0.0.1 2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[2018-02-05 18:30:22,451] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
[2018-02-05 18:30:22,597] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)

go kafka

package main

import (
   "fmt"
   "time"
   "github.com/Shopify/sarama"
)

func main() {
   config := sarama.NewConfig()
   config.Producer.RequiredAcks = sarama.WaitForAll
   config.Producer.Partitioner = sarama.NewRandomPartitioner
   config.Producer.Return.Successes = true

   client, err := sarama.NewSyncProducer([]string{"192.168.179.130:9092"}, config)
   if err != nil {
      fmt.Println("producer close, err:", err)
      return
   }

   defer client.Close()
   msg := &sarama.ProducerMessage{}
   msg.Topic = "nginx_log"
   msg.Value = sarama.StringEncoder("this is a good test, my message is good")

   pid, offset, err := client.SendMessage(msg)
   if err != nil {
      fmt.Println("send message failed,", err)
      return
   }

   fmt.Printf("pid:%v offset:%v\n", pid, offset)
   time.Sleep(10 * time.Millisecond)
}

linux tail命令

​ -f 用於循環讀取文件的內容,監視文件的增長

​ -F 與-f類似,區別在於當將監視的文件刪除重建后-F仍能監視該文件內容-f則不行,-F有重試的功能,會不斷重試

package main

import (
   "fmt"
   "github.com/hpcloud/tail"
   "time"
)
func main() {
   filename := "/root/passwd"
   tails, err := tail.TailFile(filename, tail.Config{
      ReOpen:    true,
      Follow:    true,
      //Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
      MustExist: false,  
         Poll:      true,
   })
   if err != nil {
      fmt.Println("tail file err:", err)
      return
   }
   var msg *tail.Line
   var ok bool
   for true {
      msg, ok = <-tails.Lines
      if !ok {
         fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
         time.Sleep(100 * time.Millisecond)
         continue
      }
      fmt.Println("msg:", msg)
   }
}

配置文件庫使用

  1. 初始化配置庫

    iniconf, err := NewConfig("ini", "testini.conf")
    if err != nil {
        t.Fatal(err)
    }
    
  2. 讀取配置項

	•	String(key string) string
	•	Int(key string) (int, error)
	•	Int64(key string) (int64, error)
	•	Bool(key string) (bool, error)
	•	Float(key string) (float64, error)

cofig的go實現

package main

import (
   "fmt"
   "github.com/astaxie/beego/config"
)

func main() {
   conf, err := config.NewConfig("ini", "./logagent.conf")
   if err != nil {
      fmt.Println("new config failed, err:", err)
      return
   }

   port, err := conf.Int("server::port")
   if err != nil {
      fmt.Println("read server:port failed, err:", err)
      return
   }

   fmt.Println("Port:", port)
   log_level := conf.String("logs::log_level")
   if len(log_level) == 0 {
      log_level = "debug"
   }

   fmt.Println("log_level:", log_level)

   log_path := conf.String("logs::log_path")
   fmt.Println("log_path:", log_path)
}

日志庫的使用

  1. 配置log組件

         config := make(map[string]interface{})
    	config["filename"] = "./logs/logcollect.log"
    	config["level"] = logs.LevelDebug
    
    	configStr, err := json.Marshal(config)
    	if err != nil {
    		fmt.Println("marshal failed, err:", err)
    		return
    	}
    
  2. 初始化日志組件

    logs.SetLogger(“file”, string(configStr))
    

    寫日志

    package main
    
    import (
       "encoding/json"
       "fmt"
       "github.com/astaxie/beego/logs"
    )
    
    func main() {
       config := make(map[string]interface{})
       config["filename"] = "/root/logs/logcollect.log"
       config["level"] = logs.LevelDebug
    
       configStr, err := json.Marshal(config)
       if err != nil {
          fmt.Println("marshal failed, err:", err)
          return
       }
    
       logs.SetLogger(logs.AdapterFile, string(configStr))
    
       logs.Debug("this is a test, my name is %s", "stu01")
       logs.Trace("this is a trace, my name is %s", "stu02")
       logs.Warn("this is a warn, my name is %s", "stu03")
    }
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM