Go項目實戰:打造高並發日志采集系統(一)


項目結構

本系列文章意在記錄如何搭建一個高可用的日志采集系統,實際項目中會有多個日志文件分布在服務器各個文件夾,這些日志記錄了不同的功能。隨着業務的增多,日志文件也再增多,企業中常常需要實現一個獨立的日志采集系統,實時采集各個日志信息,並記錄和輸出到控制台或網頁上,方便監控和查詢。
本文日志采集系統架構如下

1.jpg
日志采集系統監控各個日志文件,當日志文件有日志錄入時,日志采集系統實時獲取日志內容並下入kafka隊列中,之后可以實現Web端從kafaka取出信息,並前端顯示。也可以將kafka的信息控制台輸出,這個主要是看具體需求。本系列文章主要講述如何搭建kafaka服務,編寫高並發日志采集系統,穩定高效錄入信息,以及從kafka中讀取采集的日志。

本節目標

1 配置kafka,並啟動消息隊列。
2 編寫代碼向kafka錄入消息,並且從kafka讀取消息。

kafka簡介和搭建

Kafka是一種高吞吐量的分布式發布訂閱消息系統,由Java編寫,內部使用了zookeeper(分布式應用程序協調服務),所以安裝Kafka之前需要先安裝jdk和zookeeper。

JDK安裝

去官網https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html下載jdk,按步驟安裝。之后配置環境變量即可。

Zookeeper安裝

這里說下windows安裝流程,linux類似。
從網址http://zookeeper.apache.org/releases.html下載zookeeper,之后解壓即可使用。我在windows創建了一個文件夾D:\kafkazookeeper,將zookeeper解壓到該文件夾。打開D:\kafkazookeeper\zookeeper-3.4.14\conf,把zoo_sample.cfg復制一份命名為zoo.cfg,從文本編輯器里打開zoo.cfg修改如下內容

dataDir=D:\\kafkazookeeper\\zookeeper-3.4.14\\data
dataLogDir=D:\\kafkazookeeper\\zookeeper-3.4.14\\log

目錄根據你個人設置就行了。接下來添加如下環境變量 


ZOOKEEPER_HOME: D:\kafkazookeeper\zookeeper-3.4.14 Path: 在現有的值后面添加 ";%ZOOKEEPER_HOME%\bin;

 

ZOOKEEPER_HOME值就是你的kafka安裝目錄。接下來進入D:\kafkazookeeper\zookeeper-3.4.14\bin啟動zkServer.cmd
看到zookeeper服務跑起來了,默認端口為2181,不要關閉。

kafka安裝

下載地址http://kafka.apache.org/downloads.html
將其解壓到我自己的D:\kafkazookeeper目錄下,打開D:\kafkazookeeper\kafka_2.12-2.2.0\config修改log.dirs,設置為

log.dirs=D:\\kafkazookeeper\\kafka_2.12-2.2.0\\logs

在kafka目錄里執行如下命令,啟動kafka 

.\bin\windows\kafka-server-start.bat .\config\server.properties

測試kafka 

創建topics

在kafka目錄里執行如下命令

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

這樣我們創建了一個主題,這個主題相當於一個標簽,用於消息讀寫。 

打開一個Producer

同樣在kafka目錄下執行

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

這樣我們基於test主題啟動了一個生產者 

打開一個Consumer

同樣在kafka目錄下執行

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

我們在生產者窗口寫一些消息注入hello consumer,消費者窗口會取出消息並顯示 hello consumer 

2.jpg

實現代碼向kafka寫入消息

func main() {
	config := sarama.NewConfig()
	// 等待服務器所有副本都保存成功后的響應
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失敗后的響應
	config.Producer.Return.Successes = true
	// 使用給定代理地址和配置創建一個同步生產者
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		panic(err)
	}
	defer producer.Close()
	//構建發送的消息,
	msg := &sarama.ProducerMessage{
		//Topic: "test",//包含了消息的主題
		Partition: int32(10),                   //
		Key:       sarama.StringEncoder("key"), //
	}
	inputReader := bufio.NewReader(os.Stdin)
	for{
		value, _ , err := inputReader.ReadLine()
    	if err != nil {
        	fmt.Printf("error:", err.Error())
        	return
    	}
		msgType , _, err  := inputReader.ReadLine()
		msg.Topic = string(msgType)
		fmt.Println("topic is : ",msg.Topic)
		fmt.Println("value is : ",string(value))
		msg.Value = sarama.ByteEncoder(value)
        partition, offset, err := producer.SendMessage(msg)

        if err != nil {
			fmt.Println("Send message Fail")
			fmt.Println(err.Error())
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
	}
}

上述代碼基於本地端口9092創建了生產者,然后構造了消息的分區大小以及Key值,接下來循環讀取終端錄入信息,第一行為value,第二行為topic,然后將消息發送到kafka,並且打印存儲的分區和位移。
我們運行我們的程序,錄入消息,可以看到消息發送到kafka后被消費者獲取。
3.jpg
下一篇,我們完善消費者程序,並且實現文件監控和讀取

謝謝關注我的公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM