Go實現海量日志收集系統(四)


到這一步,我的收集系統就已經完成很大一部分工作,我們重新看一下我們之前畫的圖:

我們已經完成前面的部分,剩下是要完成后半部分,將kafka中的數據扔到ElasticSearch,並且最終通過kibana展現出來

ElasticSearch

官網地址這里介紹了非常詳細的安裝方法:
https://www.elastic.co/downloads/elasticsearch

但是其實這里是需要配置一些東西的,要不然直接啟動是會悲劇的,在網上找了一個地址,如果出現類似的錯誤直接處理就行,我自己已經驗證了:
https://blog.csdn.net/liangzhao_jay/article/details/56840941

如下圖所示就表示已經安裝完成:

 

 

 

 

 

 通過go寫一個簡單的調用ElasticSearch的例子:

package main

import (
    "fmt"
    elastic "gopkg.in/olivere/elastic.v2"
)

type Tweet struct{
    User string
    Message string
}

func main(){
    client,err := elastic.NewClient(elastic.SetSniff(false),elastic.SetURL("http://192.168.0.118:9200/"))
    if err != nil{
        fmt.Println("connect es error",err)
        return
    }
    fmt.Println("conn es succ")
    tweet := Tweet{User:"olivere name",Message:"Take Five"}
    _, err = client.Index().Index("twitter").Type("tweet").Id("1").BodyJson(tweet).Do()
    if err != nil {
        panic(err)
        return
    }
    fmt.Println("insert succ")
}

logtransfer

logtransfer主要負責從 kafka隊列中讀取日志信息,並且添加到ElasticSearch中

看那一下logtransfer 目錄結構如下:

├── conf
│   └── app.conf
├── es.go
├── etcd.go
├── ip.go
├── kafka.go
├── logs
│   └── transfer.log
└── main.go

conf:存放配置文件
es.go:主要是連接ElasticSearch的部分以及用於將消息放到ElasticSearch中
etcd.go:主要用於做動態的配置更改,當我們需要將kafka中的哪些topic日志內容扔到ElasticSearch中
ip.go: 用於獲取當前服務器的ip地址
kafka.go: 主要是kafka的處理邏輯,包括連接kafka以及從kafka中讀日志內容
main.go:代碼的入口函數

整體大代碼框架,通過如圖展示:

和之前的logagent中的代碼有很多啟示是可以復用的或者稍作更改,就可以了,其中es之心的,主要是連接ElasticSearch並將日志內容放進去

es.go的代碼內容為:

package main

import (
    "gopkg.in/olivere/elastic.v2"
    "github.com/astaxie/beego/logs"
    "sync"
    "encoding/json"
)

var waitGroup sync.WaitGroup

var client *elastic.Client

func initEs(addr string,) (err error){
    client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(addr))
    if err != nil{
        logs.Error("connect to es error:%v",err)
        return
    }
    logs.Debug("conn to es success")
    return
}

func reloadKafka(topicArray []string) {
    for _, topic := range topicArray{
        kafkaMgr.AddTopic(topic)
    }
}

func reload(){
    //GetLogConf() 從channel中獲topic信息,而這部分信息是從etcd放進去的
    for conf := range GetLogConf(){
        var topicArray []string
        err := json.Unmarshal([]byte(conf),&topicArray)
        if err != nil {
            logs.Error("unmarshal failed,err:%v conf:%v",err,conf)
            continue
        }
        reloadKafka(topicArray)
    }
}

func Run(esThreadNum int) (err error) {
    go reload()
    for i:=0;i<esThreadNum;i++{
        waitGroup.Add(1)
        go sendToEs()
    }
    waitGroup.Wait()
    return
}

type EsMessage struct {
    Message string
}

func sendToEs(){
    // 從msgChan中讀取日志內容並扔到elasticsearch中
    for msg:= range GetMessage() {
        var esMsg EsMessage
        esMsg.Message = msg.line
        _,err := client.Index().Index(msg.topic).Type(msg.topic).BodyJson(esMsg).Do()
        if err != nil {
            logs.Error("send to es failed,err:%v",err)
            continue
        }
        logs.Debug("send to es success")
    }
    waitGroup.Done()
}

最終我將logagnet以及logtransfer部署到虛擬機上進行測試的效果是:

 

 

這樣當我再次查日志的時候就可以不用登陸每台服務器去查日志,只需要通過頁面根據關鍵字迅速看到相關日志,當然目前實現的功能還是有點粗糙,etcd的更改程序,是自己寫的發送程序,其實更好的解決方法是通過頁面,讓用戶點來點去,來控制自己要收集哪些日志,以及自己要將哪些topic的日志從kafka中放到ElasticSearch (本人是做后端開發,不擅長前端的開發,不過后面可以試着寫個頁面試試,估計會很丑哈哈)

同時這里關於各個部分的安裝並沒有做過多的介紹,以及維護,當然我們的目標是是通過這些開源的軟件以及包來實現我們想要的功能,后期的維護,肯定需要對各個組件部分都進行深入了解

這里附贈一下那個etcd客戶端代碼:

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "golang.org/x/net/context"
)

var logconf = `
[
    {
        "topic":"eslservice_log",
        "log_path":"/opt/pbx/log/eslservice.log",
        "service":"eslservice",
        "send_rate":50000
    }
]
`

var test111 = `
[
    {
        "topic":"test_log",
        "log_path":"D:/a.log",
        "service":"test",
        "send_rate":50000
    }
]
`


var transconf = `
[
    "eslservice_log"
]
`

func main() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.90.78:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect success")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.11/log_config",logconf)
    //_,err = cli.Put(ctx,"/logagent/192.168.90.61/log_config",test111)
    _, err = cli.Put(ctx,"/logtransfer/192.168.90.11/log_config",transconf)
    cancel()
    if err != nil {
        fmt.Println("put failed ,err:",err)
        return
    }
    ctx,cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"/logtransfer/",clientv3.WithPrefix())
    cancel()
    if err != nil {
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev:=range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}

到目前為止基本的功能都已經實現了,當然了現在的代碼結構還有的糙,后面會進行優化!
整個項目中的代碼:
logagent代碼地址:https://github.com/pythonsite/logagent
logtransfer代碼地址:https://github.com/pythonsite/logtransfer

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM