項目結構
本系列文章意在記錄如何搭建一個高可用的日志采集系統,實際項目中會有多個日志文件分布在服務器各個文件夾,這些日志記錄了不同的功能。隨着業務的增多,日志文件也再增多,企業中常常需要實現一個獨立的日志采集系統,實時采集各個日志信息,並記錄和輸出到控制台或網頁上,方便監控和查詢。
本文日志采集系統架構如下
日志采集系統監控各個日志文件,當日志文件有日志錄入時,日志采集系統實時獲取日志內容並下入kafka隊列中,之后可以實現Web端從kafaka取出信息,並前端顯示。也可以將kafka的信息控制台輸出,這個主要是看具體需求。本系列文章主要講述如何搭建kafaka服務,編寫高並發日志采集系統,穩定高效錄入信息,以及從kafka中讀取采集的日志。
本節目標
1 配置kafka,並啟動消息隊列。
2 編寫代碼向kafka錄入消息,並且從kafka讀取消息。
kafka簡介和搭建
Kafka是一種高吞吐量的分布式發布訂閱消息系統,由Java編寫,內部使用了zookeeper(分布式應用程序協調服務),所以安裝Kafka之前需要先安裝jdk和zookeeper。
JDK安裝
去官網https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html下載jdk,按步驟安裝。之后配置環境變量即可。
Zookeeper安裝
這里說下windows安裝流程,linux類似。
從網址http://zookeeper.apache.org/releases.html下載zookeeper,之后解壓即可使用。我在windows創建了一個文件夾D:\kafkazookeeper,將zookeeper解壓到該文件夾。打開D:\kafkazookeeper\zookeeper-3.4.14\conf,把zoo_sample.cfg復制一份命名為zoo.cfg,從文本編輯器里打開zoo.cfg修改如下內容
dataDir=D:\\kafkazookeeper\\zookeeper-3.4.14\\data dataLogDir=D:\\kafkazookeeper\\zookeeper-3.4.14\\log
目錄根據你個人設置就行了。接下來添加如下環境變量
ZOOKEEPER_HOME: D:\kafkazookeeper\zookeeper-3.4.14
Path: 在現有的值后面添加 ";%ZOOKEEPER_HOME%\bin;
ZOOKEEPER_HOME值就是你的kafka安裝目錄。接下來進入D:\kafkazookeeper\zookeeper-3.4.14\bin啟動zkServer.cmd
看到zookeeper服務跑起來了,默認端口為2181,不要關閉。
kafka安裝
下載地址http://kafka.apache.org/downloads.html
將其解壓到我自己的D:\kafkazookeeper目錄下,打開D:\kafkazookeeper\kafka_2.12-2.2.0\config修改log.dirs,設置為
log.dirs=D:\\kafkazookeeper\\kafka_2.12-2.2.0\\logs
在kafka目錄里執行如下命令,啟動kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
測試kafka
創建topics
在kafka目錄里執行如下命令
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
這樣我們創建了一個主題,這個主題相當於一個標簽,用於消息讀寫。
打開一個Producer
同樣在kafka目錄下執行
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
這樣我們基於test主題啟動了一個生產者
打開一個Consumer
同樣在kafka目錄下執行
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
我們在生產者窗口寫一些消息注入hello consumer,消費者窗口會取出消息並顯示 hello consumer
實現代碼向kafka寫入消息
func main() { config := sarama.NewConfig() // 等待服務器所有副本都保存成功后的響應 config.Producer.RequiredAcks = sarama.WaitForAll // 隨機的分區類型:返回一個分區器,該分區器每次選擇一個隨機分區 config.Producer.Partitioner = sarama.NewRandomPartitioner // 是否等待成功和失敗后的響應 config.Producer.Return.Successes = true // 使用給定代理地址和配置創建一個同步生產者 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } defer producer.Close() //構建發送的消息, msg := &sarama.ProducerMessage{ //Topic: "test",//包含了消息的主題 Partition: int32(10), // Key: sarama.StringEncoder("key"), // } inputReader := bufio.NewReader(os.Stdin) for{ value, _ , err := inputReader.ReadLine() if err != nil { fmt.Printf("error:", err.Error()) return } msgType , _, err := inputReader.ReadLine() msg.Topic = string(msgType) fmt.Println("topic is : ",msg.Topic) fmt.Println("value is : ",string(value)) msg.Value = sarama.ByteEncoder(value) partition, offset, err := producer.SendMessage(msg) if err != nil { fmt.Println("Send message Fail") fmt.Println(err.Error()) } fmt.Printf("Partition = %d, offset=%d\n", partition, offset) } }
上述代碼基於本地端口9092創建了生產者,然后構造了消息的分區大小以及Key值,接下來循環讀取終端錄入信息,第一行為value,第二行為topic,然后將消息發送到kafka,並且打印存儲的分區和位移。
我們運行我們的程序,錄入消息,可以看到消息發送到kafka后被消費者獲取。
下一篇,我們完善消費者程序,並且實現文件監控和讀取
。
謝謝關注我的公眾號