最近在學go,所以就用go寫了一個腳本,讀取服務器的log日志,根據正則匹配,從log日志中匹配想要的內容,然后存到influxdb數據庫作為數據源,最后將數據在grafana中展示
下面寫一下詳細的安裝步驟:
首先我找了一個服務器,在服務器上先進行安裝influxdb、安裝go、安裝grafana。
因為我找的服務器系統有點老,是centos6.5,所以安裝跟高版本的還是不太一樣
安裝grafana:
1、下載安裝包:
wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.6.3-1.x86_64.rpm --no-check-certificate

2、安裝環境依賴
yum install initscripts
yum install fontconfig
yum install freetype*
yum install urw-fonts
安裝grafana服務
rpm -Uvh grafana-4.6.3-1.x86_64.rpm
3、安裝完成后修改配置文件/etc/grafana/grafana.ini
配置文件可以根據說明自行修改需要的行即可
4、安裝后這些文件的存放路徑
安裝二進制文件 /usr/sbin/grafana-server
將init.d腳本復制到 /etc/init.d/grafana-server
安裝默認文件(環境變量) /etc/sysconfig/grafana-server
將配置文件復制到 /etc/grafana/grafana.ini
安裝systemd服務(如果systemd可用)名稱 grafana-server.service
默認配置使用日志文件 /var/log/grafana/grafana.log
5、grafana服務
啟動:service grafana-server start
停止:service grafana-server stop
重啟:service grafana-server restart
加入開機自啟動: chkconfig --add grafana-server on
6、安裝zabbix插件
grafana-cli plugins install alexanderzobnin-zabbix-app
7、安裝其他面板插件
例:
grafana-cli plugins install grafana-clock-panel
grafana-cli plugins install grafana-piechart-panel
grafana-cli plugins install raintank-worldping-app
grafana-cli plugins install jasonlashua-prtg-datasourc
grafana-cli plugins install jasonlashua-prtg-datasource g
rafana-cli plugins install grafana-worldmap-panel
8、卸載插件
例:
grafana-cli plugins uninstall jasonlashua-prtg-datasource
grafana-cli plugins uninstall raintank-worldping-app
9、安裝或卸載完成后,需要重啟服務
service grafana-server restart
10、驗證
查看端口3000 是否監聽:netstat nuplt
rpm -qa | grep -i grafana
查看運行情況命令:
瀏覽器輸入http://ip:3000 查看登錄頁面是否正常:
出現這個頁面說明安裝正常:賬號和密碼都默認admin
安裝influx:
1、wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.8.x86_64.rpm
3、sudo service influxdb start

提示:在用go語言定義字段的類型的時候要注意跟數據表的類型對應上,否則會出現插入不到表的情況,會提示類型錯誤之類的問題。
influx跟mysql不一樣,它沒有專門的創建表的語句,它在執行插入操作的時候就默認建了一張表,下面的例子就是默認access_test表不存在,直接執行inser操作插入一條數據
關於go的代碼:
1、我本地測試的,在目錄下有一個access.log日志的腳本
2、代碼文件log文件進行讀取log日志,寫入influx數據庫,這里強調一下
代碼,需要下載go的clinet,地址:https://github.com/influxdata/influxdb-client-go
否則無法連接
log文件代碼:
package main
import (
"bufio"
"flag"
"fmt"
client "github.com/influxdata/influxdb1-client/v2"
"io"
"log"
"net/http"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"encoding/json"
)
type Reader interface {
Read(rc chan []byte)
}
type Writer interface {
Write(wc chan *Message)
}
type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}
type ReadFromFile struct {
path string // 讀取文件的路徑
}
type WriteToInfluxDB struct {
influxDBDsn string // influx data source
}
type Message struct {
TimeLocal time.Time
BytesSent float64
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
// 系統狀態監控
type SystemInfo struct {
HandleLine int `json:"handleLine"` // 總處理日志行數
Tps float64 `json:"tps"` // 系統吞出量
ReadChanLen int `json:"readChanLen"` // read channel 長度
WriteChanLen int `json:"writeChanLen"` // write channel 長度
RunTime string `json:"runTime"` // 運行總時間
ErrNum int `json:"errNum"` // 錯誤數
}
const (
TypeHandleLine = 0
TypeErrNum = 1
)
var TypeMonitorChan = make(chan int, 200)
type Monitor struct {
startTime time.Time
data SystemInfo
tpsSli []int
}
func (m *Monitor) start(lp *LogProcess) {
go func() {
for n := range TypeMonitorChan {
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()
ticker := time.NewTicker(time.Second * 5)
go func() {
for {
<-ticker.C
m.tpsSli = append(m.tpsSli, m.data.HandleLine)
if len(m.tpsSli) > 2 {
m.tpsSli = m.tpsSli[1:]
}
}
}()
http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.ReadChanLen = len(lp.rc)
m.data.WriteChanLen = len(lp.wc)
if len(m.tpsSli) >= 2 {
m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
}
ret, _ := json.MarshalIndent(m.data, "", "\t")
io.WriteString(writer, string(ret))
})
http.ListenAndServe(":9193", nil)
}
func (r *ReadFromFile) Read(rc chan []byte) {
// 讀取模塊
fmt.Println("start readfromfile")
// 打開文件
fmt.Println(r.path)
f, err := os.Open(r.path)
fmt.Println(f)
if err != nil {
panic(fmt.Sprintf("open file error:%s", err.Error()))
}
// 從文件末尾開始逐行讀取文件內容
//f.Seek(0, 2)
rd := bufio.NewReader(f)
fmt.Println("rd",rd)
for {
fmt.Println("for")
line, err := rd.ReadBytes('\n')
fmt.Println("line:",line)
fmt.Println("err:",err)
fmt.Println("ioeof:",io.EOF)
fmt.Println("linetorc",line[:len(line)-1])
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error:", err.Error()))
}
TypeMonitorChan <- TypeHandleLine
rc <- line[:len(line)-1]
fmt.Println("read foreach success",rc)
}
fmt.Println("read success")
}
func (w *WriteToInfluxDB) Write(wc chan *Message) {
// 寫入模塊
fmt.Println("writetoinfluxdb")
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: "http://127.0.0.1:8086",
Username: "admin",
Password: "",
})
fmt.Println("client.newhttpclient",c)
if err != nil {
fmt.Println("err")
log.Fatal(err)
}
fmt.Println("wc",wc)
for v := range wc {
fmt.Println("wccccc")
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: "mylogdb",
Precision: "s",
})
fmt.Println("bp:",bp)
if err != nil {
log.Fatal(err)
}
// Create a point and add to batch
// Tags: Path, Method, Scheme, Status
tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
// Fields: UpstreamTime, RequestTime, BytesSent
fields := map[string]interface{}{
"UpstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
"BytesSent": v.BytesSent,
}
pt, err := client.NewPoint("log_test", tags, fields, v.TimeLocal)
fmt.Println("pt:",pt)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
log.Println("write success!")
fmt.Println("write foreach success!")
}
fmt.Println("write success")
}
func (l *LogProcess) Process() {
// 解析模塊
/**
172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
*/
fmt.Println("l logprocess")
r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`)
loc, _ := time.LoadLocation("Asia/Shanghai")
fmt.Println("for process lrc",l.rc)
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
fmt.Println("logprocessret",ret[5])
fmt.Println("ret length:", len(ret))
if len(ret) != 14 {
TypeMonitorChan <- TypeErrNum
log.Println("FindStringSubmatch fail:", string(v))
continue
}
message := &Message{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Println("ParseInLocation fail:", err.Error(), ret[4])
continue
}
message.TimeLocal = t
byteSent, _ := strconv.Atoi(ret[8])
//byteSent, _ := ret[8]
message.BytesSent = float64(byteSent)
// GET /foo?query=t HTTP/1.0
reqSli := strings.Split(ret[6], " ")
if len(reqSli) != 3 {
TypeMonitorChan <- TypeErrNum
log.Println("strings.Split fail", ret[6])
continue
}
message.Method = reqSli[0]
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println("url parse fail:", err)
TypeMonitorChan <- TypeErrNum
continue
}
message.Path = u.Path
message.Scheme = ret[5]
message.Status = ret[7]
upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
requestTime, _ := strconv.ParseFloat(ret[13], 64)
message.UpstreamTime = upstreamTime
message.RequestTime = requestTime
l.wc <- message
fmt.Println("logprocess foreach success")
}
fmt.Println("logprocess success")
}
func main() {
//定義路徑、數據庫變量
var path, influxDsn string
//變量path、influxdsn賦值
flag.StringVar(&path, "path", "/Users/luoyan3/go/src/go-grafana-log/access_test.log", "read file path")
flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@admin@@mylogdb@s", "influx data source")
flag.Parse()
fmt.Println("accccccess_log_path",path)
r := &ReadFromFile{
path: path,
}
fmt.Println("readfromfile:", r)
w := &WriteToInfluxDB{
influxDBDsn: influxDsn,
}
fmt.Println("writetoinfluxdb:",w)
lp := &LogProcess{
rc: make(chan []byte, 200),
wc: make(chan *Message, 200),
read: r,
write: w,
}
fmt.Println("logprocess:", lp.wc)
go lp.read.Read(lp.rc)
for i := 0; i < 2 ; i++ {
fmt.Println("for one")
go lp.Process()
}
fmt.Println("wccccc", lp.wc)
for i := 0; i < 4 ; i++ {
fmt.Println("for two")
go lp.write.Write(lp.wc)
}
m := &Monitor{
startTime: time.Now(),
data: SystemInfo{},
}
m.start(lp)
fmt.Println("success")
}
我這個腳本是可以跑通的。
最后在grafana展示的結果是:

最后總結一下,在linux服務器上安裝一些東西的時候會有很多報錯信息,不要擔心,根據報錯的內容去谷歌一下看看還需要下載什么,挨個安裝就可以了。最后總會安裝成功的。
后期自己會把這一套用在自己的一些項目上,可以看看自己項目的log日志對於各個接口的進行監控。