Go項目實戰:打造高並發日志采集系統(二)


日志統計系統的整體思路就是監控各個文件夾下的日志,實時獲取日志寫入內容並寫入kafka隊列,寫入kafka隊列可以在高並發時排隊,而且達到了邏輯解耦合的目的。然后從kafka隊列中讀出數據,根據實際需求顯示網頁上或者控制台等。

前情提要

上一節我們完成了如下目標
1 配置kafka,並啟動消息隊列。
2 編寫代碼向kafka錄入消息,並且從kafka讀取消息。

本節目標

1 寫代碼從kafka中讀取消息,保證kafka消息讀寫功能無誤。
2 借助tailf實現文件監控,並模擬測試事實寫文件以及文件備份時功能無誤。
3 本系列文章開發語言使用Go

從kafka中讀取消息

func main(){
	fmt.Println("consumer begin...")
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	wg  :=sync.WaitGroup{}
	//創建消費者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"},config)
	if err != nil {
		fmt.Println("consumer create failed, error is ", err.Error())
		return
	}
	defer consumer.Close()
	
	//Partitions(topic):該方法返回了該topic的所有分區id
    partitionList, err := consumer.Partitions("test")
    if err != nil {
		fmt.Println("get consumer partitions failed")
		fmt.Println("error is ", err.Error())
		return
    }

	for partition := range partitionList {
		//ConsumePartition方法根據主題,
		//分區和給定的偏移量創建創建了相應的分區消費者
		//如果該分區消費者已經消費了該信息將會返回error
		//OffsetNewest消費最新數據
        pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
		}
		//異步關閉,保證數據落盤
        defer pc.AsyncClose()
        wg.Add(1)
        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            //Messages()該方法返回一個消費消息類型的只讀通道,由代理產生
            for msg := range pc.Messages() {
				fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", 
				msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }
        }(pc)
    }
    wg.Wait()
    consumer.Close()
	
}

  

這樣我們啟動zookeeper和kafka后,分別運行前文實現的向kafka中寫入數據的代碼,以及現在的從kafka中消費的代碼,看到如下效果
1.jpg

實現文件監控

實現文件監控,主要是在文件中有內容寫入時,程序可以及時獲取寫入的內容,類似於Linux命令中的tailf -f 某個文件的功能。
golang 中提供了tail庫,我們借助這個庫完成指定文件的監控,我的文件組織如下
4.jpg

 logdir文件夾下的log.txt記錄的是不斷增加的日志文件
tailf文件夾下logtailf.go實現log.txt監控功能。
writefile文件夾下writefile.go實現的是向log.txt文件寫日志並備份的功能。

func main() {
	logrelative := `../logdir/log.txt`
	_, filename, _, _ := runtime.Caller(0)
	fmt.Println(filename)
	datapath := path.Join(path.Dir(filename), logrelative)
	fmt.Println(datapath)
	tailFile, err := tail.TailFile(datapath, tail.Config{
		//文件被移除或被打包,需要重新打開
		ReOpen: true,
		//實時跟蹤
		Follow: true,
		//如果程序出現異常,保存上次讀取的位置,避免重新讀取
		Location: &tail.SeekInfo{Offset: 0, Whence: 2},
		//支持文件不存在
		MustExist: false,
		Poll:      true,
	})

	if err != nil {
		fmt.Println("tail file err:", err)
		return
	}

	for true {
		msg, ok := <-tailFile.Lines
		if !ok {
			fmt.Printf("tail file close reopen, filename: %s\n", tailFile.Filename)
			time.Sleep(100 * time.Millisecond)
			continue
		}
		//fmt.Println("msg:", msg)
		//只打印text
		fmt.Println("msg:", msg.Text)
	}
}

為了測試監控的功能。我們實現向log.txt中每隔0.1s寫入一行”Hello+時間戳”的日志。當寫入20條內容后我們將log.txt備份重命名。
然后創建新的log.txt繼續寫入。
在writefile.go實現一個函數定時寫入,並且備份功能

func writeLog(datapath string) {
	filew, err := os.OpenFile(datapath, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		fmt.Println("open file error ", err.Error())
		return
	}

	w := bufio.NewWriter(filew)
	for i := 0; i < 20; i++ {
		timeStr := time.Now().Format("2006-01-02 15:04:05")
		fmt.Fprintln(w, "Hello current time is "+timeStr)
		time.Sleep(time.Millisecond * 100)
		w.Flush()
	}
	logBak := time.Now().Format("20060102150405") + ".txt"
	logBak = path.Join(path.Dir(datapath), logBak)
	filew.Close()
	err = os.Rename(datapath, logBak)
	if err != nil {
		fmt.Println("Rename error ", err.Error())
		return
	}
}

  然后我們實現main函數,調用三次writeLog,這樣會產生三個備份文件

func main() {
	logrelative := `../logdir/log.txt`
	_, filename, _, _ := runtime.Caller(0)
	fmt.Println(filename)
	datapath := path.Join(path.Dir(filename), logrelative)
	for i := 0; i < 3; i++ {
		writeLog(datapath)
	}
}

 

我們分別啟動文件監控和文件寫入程序,效果如下
2.jpg
可以看到,當log.txt有內容寫入時,logtailf.go實現了動態監控,而且當文件備份時,logtailf.go提示了文件被重命名備份。
最終我們看到產生三個備份文件
3.jpg

總結

目前我們已經完成了kafka消息讀寫,文件監控,動態寫入和備份等功能,接下來我們實現項目的配置化和統籌代碼。
源碼下載
https://github.com/secondtonone1/golang-
感謝關注我的公眾號
wxgzh.jpg

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM