海量日志收集項目總結(1)logagent開發
此部分源碼:
項目目錄結構:

各個模塊作用:

各個模塊間的數據流:

簡介
本項目用於系統中日志的采集處理。
對於日志采集業界其實已有成熟的ELK方案。
ELK方案
ELK:
- E:Elasticserach(建立索引)
- L: Logstash(日志采集)
- K: Kibana(可視化展示)

ELK在每台服務器上部署logstash,比較重量級,所以演化成客戶端部署filebeat的EFK,由filebeat收集向logstash中寫數據,最后落地到elasticsearch,通過kibana界面進行日志檢索。
為什么本項目不適用ELK方案?
缺點:
- 運維成本⾼,每增加⼀個⽇志收集項,都需要⼿動修改配置
- 監控缺失,⽆法准確獲取logstash的狀態。⽆法做到定制化開發與維護
- ⽆法做到定制化開發與維護
項目架構

各部分作用
- logagent:這里應自己寫的logagent代替ELK的Logstash進行日志收集
- kafka:分布式消息隊列,具體作用的介紹間我的這篇博客kafka簡介
- logTranfer:用於從Kafka中取出數據送入ES(完全需要自己寫)
- ES:用於對日志建立索引並存儲
- etcd:用於系統配置管理,起到了個數據庫的作用
- kibana:用於對ES里的數據進行可視化展示,呈現的是一個web界面
logagent模塊設計過程
logagent的主要作用是日志的收集,大概可划分為下面幾個步驟:
- 從指定位置讀取日志文件
- 將獨到的內容發往kafka
- 動態的監視(watch)etcd中的數據,以實現熱配置,etcd在這里起到了消息發布訂閱的作用。
讀文件時需要用到tail的第三方庫 tail
github.com/hpcloud/tail
讀取配置文件內容
這里我們使用GitHub上的第三方庫https://github.com/go-ini
具體使用方法見 ini解析配置文件。
建立conf文件夾,目錄結構為:

config.go:
package conf
type AppConf struct {
KafkaConf `ini:"kafka"`
}
type KafkaConf struct {
Address string `ini:"address"`
Topic string `ini:"topic"`
}
config.ini:
[kafka]
address=127.0.0.1:9092
topic=zhouzheng
初始化kafka並發送信息
操作kafka時需要用到第三方庫saramagithub.com/Shopify/sarama
注意如果go的版本為1.13,則go mod中sarama 版本號要改為v1.19.0
首先建立一個kafka的庫,用於存放和kafka相關函數,包括kafka的初始化,以及向kafka發送信息。
目錄結構如下:

kafka.go:
package kafka
import (
"fmt"
"github.com/Shopify/sarama"
)
func Init(address []string,topic string)error{
//初始化kafka配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 發送完數據需要leader和follow都確認
config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition
config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
// 構造一個消息
msg := &sarama.ProducerMessage{}
msg.Topic = topic
msg.Value = sarama.StringEncoder("yuema")
// 連接kafka
//產生一個生產者客戶端
client, err := sarama.NewSyncProducer(address, config)
if err != nil {
fmt.Println("初始化失敗:, err:", err)
return err
}
defer client.Close()
// 發送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return err
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
return err
}
此時main.go:
package main
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/conf"
"github.com/wind-zhou/Logagent_demo/kafka"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf) //初始化一個全局cfg,用作后面配置文件解析映射
)
func main(){
//0. 解析配置文件的內容
err := ini.MapTo(cfg, "./conf/cfg.ini")
if err != nil {
fmt.Printf("解析文件出錯:err=",err)
return
}
fmt.Println("解析文件成功")
fmt.Printf("%v\n",cfg)
//1.初始化kafka
err=kafka.Init([]string{cfg.KafkaConf.Address},cfg.KafkaConf.Topic)
if err!=nil{
fmt.Println("初始化失敗:, err:", err)
return
}
fmt.Println("kafka init success")
}
調試結果為:
E:\goProject\src\github.com\wind-zhou\Logagent_demo>main.exe
解析文件成功
&{{127.0.0.1:9092 zhouzheng}}
pid:0 offset:18
kafka init success
在終端創建消費者,觀察數據:

加入tail模塊
tail模塊可以才指定位置讀取文件信息,並將信息發往kafka,配置文件的位置信息可以寫入配置文件中。
就是把某個檔案文件的最后幾行顯示到終端上,假設該檔案有更新,tail會自己主動刷新,確保你看到最新的檔案內容 ,在日志收集中可以實時的監測日志的變化。
測試tail
package tail
import (
"fmt"
"github.com/hpcloud/tail"
"time"
)
//從指定位置讀取文件內容
func Init(fileName string)(err error){
//fileName := "./my.log"
config := tail.Config{
ReOpen: true, // 重新打開
Follow: true, // 是否跟隨
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 從文件的哪個地方開始讀
MustExist: false, // 文件不存在不報錯
Poll: true,
}
tails, err := tail.TailFile(fileName, config) //創建一個tail對象
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
var (
line *tail.Line
ok bool
)
for {
line, ok = <-tails.Lines//遍歷chan,讀取日志內容
if !ok {
fmt.Printf("tail file close reopen, filename:%s\n", tails.Filename)
time.Sleep(time.Second)
continue
}
fmt.Println("line:", line.Text)
}
}
此時main.go中調用Init

調試:
在my.log中寫入內容:

終端顯示成功:

至此tail測試成功,接下來任務是把獨到的內容發到kafka
tail連接kafka
問題關鍵就是在tail讀取完后,將數據發往kafka。
構造一個SendToKafka函數,往kafka里發數據,該函數在tail處死話完成后調用。
kafka.go:
創建func SendToKafka()

tail.go
在初始化后調用SendToKafka()

調試:
此時在my.log中寫入數據,應該可以在終端創建的kafka消費者中讀取到。


測試成功。
添加etcd模塊
上面的日志產生收集和發送都只涉及到了單個topic,如果不同的業務線則產生會不同的topic日志,且存儲在不同位置,這個時候就需要對他們進行批量的管理,如要求系統啟動時要開啟n個任務,從n個業務線分別拉去日志,此刻etcd派上了用場(etcd本質上是個數據庫,只是他可以實現許多功能,如消息的發布訂閱,配置管理等),
關於etcd的操作需要下載第三方庫,這個庫目前為止,由於版本兼容性等問題,使用時可能會有不少坑,具體問題見我這兩篇博客
關於etcd的及具體操作:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// etcd client put/get demo
// use etcd/clientv3
type LogEntry struct {
Path string`jaon:"path"`//日志存放路徑
Topic string`json:"topic"`//要發往kafka那個topic
}
var LogEtcdConf =make([]LogEntry,1000)
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
// handle error!
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
value:=`[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/linux.log","topic":"linux_log"}]
`
_, err = cli.Put(ctx, "xxx", value)
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "xxx",clientv3.WithPrefix())
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs { //遍歷切片得到結構體指針
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
//對接受的數據反序列化
err=json.Unmarshal(ev.Value,&LogEtcdConf)//返回的ev.Value也是切片類型
if err!=nil{
fmt.Printf("json.Unmarshal failed ,err=%V\n",err)
return
}
}
for _,value:=range LogEtcdConf{
fmt.Printf("%v\n",value)
}
}
代碼說明:
這段代碼用golang操作etcd實現的put和get操作,etcd為一個分布式的KEY-VALUE存儲結構,put時。key=”xxx“,value=[{"path":"c:/tmp/nginx.log","topic":"web_log"},{"path":"d:/xxx/redis.log","topic":"redis_log"},{"path":"d:/xxx/linux.log","topic":"linux_log"}]
這里resp.Kvs是一個切片,只是這個例子中,切片的數據只有一個。
value是不同的主題的日志的配置信息組成的切片,然后將其json序列化,因此是一個字符串。
測試結果:

etcd.go
由此可知,我們可以預先將value信息存入etcd(或通過client終端,或通過golang),然后再程序中使用etcd的GET拉去這些信息,並根據這些信息去讀取和發送日志。
上面這些我們可以設置一個初始化函數來完成這項工作。
etcd.go:
Init()

GetConf()

注意:
這里的etcd初始化要放在tail初始化之前,因為tail要根據etcd拉取的配置創建對象,這里由於變成的多個topic,因此需要對tail進行改進。
tail模塊改進
因為涉及到多個topic日志采集,每個topic都需要建立一個對象,因此要建立一個結構體對其進行管理。
這時logagent的邏輯變為:
- 初始化etcd(就是和etcd集群間連接)
- 從etcd中拉取拉取信息(內容為不同的topic及其存儲的路徑)
- 初始化kafka(和kafka集群建立連接)
- 遍歷再etcd拉取的信息,每個topic建立一個tailObj實例。(用於去不同的文件讀取日志)
- 再每次建立tailObj實例時,創建一個goroutine,往kafka里發送數據。
tail模塊的目錄結構:

tail.go

上面代碼中最后要加一個循環等待,否則goroutine會因主線程退出而銷毀。
tail_mgr.go
package tail
import (
"fmt"
"github.com/hpcloud/tail"
"github.com/wind-zhou/Logagent_demo/kafka"
"time"
)
//為了管理不同的tail對象,建立一個結構體
type TailTask struct {
Path string //存放日志路徑
Topic string //日志主題
instance *tail.Tail //tail對象實例
}
//構造一個函數,對每個任務初始化
func NewTailTask(path string,topic string)(tailTaskObj *TailTask){
tailTaskObj=&TailTask{
Path: path,
Topic: topic,
}
tailTaskObj.instance=tailTaskObj.Init(path)
return
}
//用來初始化任務結構體中的實例
func (t *TailTask)Init(path string)(tailObj *tail.Tail){
//初始化配置
config := tail.Config{
ReOpen: true, // 重新打開
Follow: true, // 是否跟隨
Location: &tail.SeekInfo{Offset: 0, Whence: 2}, // 從文件的哪個地方開始讀
MustExist: false, // 文件不存在不報錯
Poll: true,
}
tailObj, err := tail.TailFile(path, config) //創建一個tail對象
if err != nil {
fmt.Println("tail file failed, err:", err)
return
}
return
}
//往kafka發送數據
func (t *TailTask)run(topic string){
for {
select {
case line:= <-t.instance.Lines: //從tail中取數據
kafka.SendToKafka(topic,line.Text) //把讀取的數據發往kafka
default:
time.Sleep(time.Second)
continue
}
}
}
main.go
package main
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/conf"
"github.com/wind-zhou/Logagent_demo/etcd"
"github.com/wind-zhou/Logagent_demo/kafka"
"github.com/wind-zhou/Logagent_demo/tail"
"gopkg.in/ini.v1"
)
var (
cfg = new(conf.AppConf)
LogEtcdConf =make([]*etcd.LogEntry,1000) //用於接受etcd拉取的配置
)
func main(){
//0. 解析配置文件的內容
err := ini.MapTo(cfg, "./conf/cfg.ini")
if err != nil {
fmt.Printf("解析文件出錯:err=",err)
return
}
fmt.Println("解析文件成功")
fmt.Printf("%v\n",cfg)
//1.初始化etcd
err=etcd.Init([]string{cfg.EtcdConf.Address},cfg.EtcdConf.Timeout)
if err != nil {
fmt.Printf("初始化etcd失敗,err=%v\n",err)
return
}
fmt.Println("connect to etcd success")
//拉取信息
LogEtcdConf,err=etcd.GetConf(cfg.EtcdConf.Key)
if err != nil {
fmt.Printf("拉取配置失敗:err=%v\n",err)
return
}
fmt.Println("拉取配置成功")
//顯示拉取的信息
for _,value:=range LogEtcdConf{
fmt.Printf("%v\n",value)
}
//2.初始化kafka
//就是連上kafka
err=kafka.Init([]string{cfg.KafkaConf.Address})
if err!=nil{
fmt.Println("初始化失敗:, err:", err)
return
}
fmt.Println("kafka init success")
//3. 初始化tail
tail.Init(LogEtcdConf)
fmt.Println("tail init success")
}
調試結果為:


測試結果表明,程序運行后可以建立連接,並讀取數據,最后通過自建的kafka消費者終端可以觀察到數據。
優化:增加配置熱更改功能
要求:
程序運行時時,如果更改etcd中的信息,例如增加一個topic項,后刪除一個topic項,系統可以根據變化動態的創建和銷毀進程。
這里需要用到etcd的Watch功能,即派一個哨兵去監視信息的變化。
watch測試
先看一下etcd的Watch的輸出格式:
wtach的代碼:

當更新etcd時:

這是程序返回結果:

有上述實驗可知,系統可以檢測到etcd的變化,那接下來的問題就是,如何將這種變化告訴tail,並動態的控制goroutine。答案是:通道。
再watch到變化后,將value放入到一個通道中,在tail模塊中,創建一個goroutine在后台不斷的監聽通道里的數據,如果拿到數據,就執行相應操作。
etcd.go
這部分的核心代碼是 WatchConf()函數

主要的作用是取出信息,發往通道,這里要提一下的是判斷更i新的類型,只有兩種類型PUT和DELETE

tail.go
拿到更新后,tail模塊的goroutine讀取通道信息,並發往kafak。
package tail
import (
"fmt"
"github.com/wind-zhou/Logagent_demo/etcd"
"time"
)
var tskMgr *taillogMgr
//從指定位置讀取文件內容
func Init(LogEtcdConf []*etcd.LogEntry){
tskMgr=&taillogMgr{
logEntry: LogEtcdConf,//把當前日志收集項配置信息保存起來
tskMap:make(map[string]*TailTask,16),//用來存儲各個task
newConfChan: make(chan []*etcd.LogEntry),//無緩沖區通道,用來接收熱更改配置
}
//1.遍歷切片,得到每個tailtask
for _,logentry := range LogEtcdConf { //遍歷切片得到結構體指針
tailObj:=NewTailTask(logentry.Path,logentry.Topic)//創建了一個tail實例
mk:=fmt.Sprintf("%s_%s",logentry.Path,logentry.Topic)//用path和topic拼接一個字符串用作每次的key值
tskMgr.tskMap[mk]=tailObj//將實例放入map
}
//test
fmt.Println("開始的數據項為:")
for k,v:=range tskMgr.tskMap{
fmt.Printf("%v:%v\n",k,v)
}
//select { }//這個select的作用是,等待時間,為了是等待協程,也可以是for{}
//創建一個goroutine,在后台默默的監聽通道,一讀取etcd變化的信息,並處理
go tskMgr.run()//負責從chan讀取更新的配置
}
//監聽自己的newConfChan,有了新配置就處理
func (t *taillogMgr)run(){
for {
select {
case newConf:= <- t.newConfChan:
//
//fmt.Println("--------------------")
//fmt.Println("tail模塊收到的配置為:")
//for _,value:=range newConf{
// fmt.Printf("%v\n",value)
//}
//fmt.Println("--------------------")
//1.判斷有沒有新增的配置項
for _,conf:=range newConf{
mk:=fmt.Sprintf("%s_%s",conf.Path,conf.Topic)
_,ok:=t.tskMap[mk]//判斷是否該項是否為原來的配置項
if ok{
//原來就有
continue
}else {
//新增的
tailObj:=NewTailTask(conf.Path,conf.Topic)//NewTailTask會根據配置文件建立和日志的聯系,並讀取
fmt.Printf("tail task %s_%s 啟動了了\n",tailObj.Path,tailObj.Topic)
t.tskMap[mk]=tailObj
}
}
//2.判斷有沒有刪除的配置項
//找出t.logEntry有但newconf沒有的,刪除掉
for _,c1:=range t.logEntry{ //從原來配置中一次拿出配置項,去新的配置中逐一比較
isDelete:=true
for _,c2:=range newConf {
if c2.Path == c1.Path && c2.Topic == c1.Topic {
isDelete = false
continue
}
}
if isDelete{
//把c1對應的這個tailObj停掉,怎么停掉這個之前的協程呢?用context
mk:=fmt.Sprintf("%s_%s",c1.Path,c1.Topic)
t.tskMap[mk].canCel()
delete(t.tskMap, mk) //刪除記錄
//fmt.Println("*********************************")
//fmt.Println("刪除某個日志項后,系統存儲的日志項為:")
//
//for _,v:=range t.tskMap{
// fmt.Printf("%v\n",v)
//}
//fmt.Println("*********************************")
}
}
//2.配置刪除
fmt.Println("新配置來了",newConf)
default:
time.Sleep(time.Second)
}
}
}
//向外暴露一個通道
func NewConfChan() chan<- []*etcd.LogEntry{
return tskMgr.newConfChan
}
這塊代碼的復雜之處是根據拿到的信息判斷里面各個日志收集項的具體變化,判斷是開啟還是關閉goroutine。
tail_mgr.go


這兩個地方是通過context控制進程的退出,每次開啟新的TailTask時就創建一個上下文。
main.go

主函數中開啟一個協程,不斷的在后台watch etcd的變化。
這個模塊許多的地方的細節都需要注意,例如:
如何在不同的模塊使用一個通道交流數據,這里時構造了一個函數,用於暴露通道,並且,通道是引用類型,因此賦值時相當於傳遞了自身的地址。
如何使用context控制goroutine退出。
還有一些基礎的東西例如,同步的鎖的使用,map,切片的操作等
調試結果:
最開始系統有三個配置項:

然后運行另外一個程序將配置項改為兩個:

這是在系統可以檢測到配置的變化:

這是在 d:/xxx/linux.log_linux_log下日志,那么kafka將接受不到信息:
此時若再次把配置改為三個,那么linux_log主題日志將會重新發往kafka。


測試成功。
>目前這里有還一個bug:
>當開始時三個日志項,然后切換從兩個日志項時,系統可以識別變化,然后再切換成三個日志項,系統也能識別變化,但若開始時兩個日志項,第一次切換成三個日志項時可以識別變化,但若再次切換成兩個時,系統便不能識別,再新配置中被刪除的日志項的goroutine無法被關閉。
>
logagent的優化
本模塊中tail在讀取到日志內容后,會直接調用kafka.SendToKafka函數往kafka發數據,這里函數調用函數,是一個同步操作,因此當tail讀取日志和發往kafka速度不匹配時就會使一方進行等待,浪費性能。
改進的方法是通過通道使同步操作變成異步操作(是不是和消息隊列很像!)
之前的方式:


改進后:
改進后相當於把之前的動作拆成了兩部分:
- tail拿到數據發往通道
- 起一個goroutine不斷地在通道中取數據(在kafka初始化時調用此goroutine)
tail_mgr.go

kafka.go

第一次寫項目博客
爛的我自己都看不下去,僅作為自己練筆練吧
推薦大家看另外一篇大佬的博客(海量日志收集項目)[https://www.cnblogs.com/zhaof/tag/go實現海量日志收集/]
