簡介
在很多數據采集場景下,Flume作為一個高性能采集日志的工具,相信大家都知道它。許多人想起Flume這個組件能聯想到的大多數都是Flume跟Kafka相結合進行日志的采集,這種方案有很多他的優點,比如高性能、高吞吐、數據可靠性等。但是我們如果要求對日志進行實時的采集,這顯然不是一個好的解決方案。原因如下:
就目前來說,Flume能支持實時監控一個目錄的數據文件,一旦對某個目錄的文件采集完成,就會打上completed的標志,若之后再有數據進入這個文件中,Flume則不會檢測到。
所以,我們更多的是使用這種方案進行定時采集,只要有一個新的數據目錄產生,我們就采集這個目錄下的數據文件。
那么接下來本篇文章將為大家介紹基於Openresty+Lua+Kafka對日志進行實時的采集。
需求
很多時候,我們需要對用戶的埋點數據進行一個實時的采集,然后用這些數據對用戶的行為做一些實時的分析。所以,第一步當然是先解決怎樣對數據進行實時的采集。
這里我們用到的方案是Openresty+Lua+Kafka。
原理介紹
那么什么是Openresty呢?這里引用官方的一段話:
OpenResty是一個基於Nginx與Lua的高性能Web平台,其內部集成了大量精良的Lua庫、第三方模塊以及大多數的依賴項。用於方便地搭建能夠處理超高並發、擴展性極高的動態 Web 應用、Web 服務和動態網關。
OpenResty通過匯聚各種設計精良的Nginx模塊,從而將Nginx有效地變成一個強大的通用Web應用平台。這樣,Web開發人員和系統工程師可以使用Lu 腳本語言調動Nginx支持的各種C以及Lua模塊,快速構造出足以勝任10K乃至1000 以上單機並發連接的高性能Web應用系統。
OpenResty的目標是讓你的Web服務直接跑在Nginx服務內部,充分利用Nginx的非阻塞 I/O 模型,不僅僅對 HTTP 客戶端請求,甚至於對遠程后端諸如MySQL、PostgreSQL、Memcached 以及 Redis 等都進行一致的高性能響應。
簡單來說,就是將客戶端的請求(本文指的是用戶的行為日志)通過Nginx把用戶的數據投遞到我們指定的地方(Kafka),而為了實現這個需求,我們用到了Lua腳本,因為Openresty封裝了各種Lua模塊,其中有一個模塊就是對Kafka模塊進行了分裝,我們只需要寫一個簡單的腳本就可以將用戶的數據通過Nginx轉發到Kafka中,以便后續對數據進行消費。
這里給出一張架構圖,方便大家理解:

在這里簡單總結一下使用Openresty+Lua+Kafka的優點:
1.支持多種業務數據,不同的業務數據,只需要配置不同的Lua腳本,就可以將不同的業務數據發送到Kafka不同的topic中。
2.對用戶觸發的埋點數據進行實時的采集
3.高可靠的集群,Openresty由於是基於Nginx,其集群擁有非常高的性能和穩定性。
4.高並發,相比tomcat、apache等web服務器,Nginx的並發量遠遠高於其他兩種。正常情況下處理上萬的並發量都不是什么難事。
那么接下來我們就動手實操一下。
Openresty的安裝
本實例采用的單機部署形式,當單機部署成功了之后,集群的搭建跟單機一樣,只是在不同的機器上執行相同的步驟而已。
注:本實驗基於centos7.0操作系統
1.下載Openresty依賴:
yum install readline-devel pcre-devel openssl-devel gcc
2.編譯安裝Openresty:
#1.安裝openresty:
mkdir /opt/software
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4
#2.配置:
# 指定目錄為/opt/openresty,默認在/usr/local。
./configure --prefix=/opt/openresty \
--with-luajit \
--without-http_redis2_module \
--with-http_iconv_module
make
make install
3.安裝lua-resty-kafka
因為我們需要將數據通過nginx+lua腳本轉發到Kafka中,編寫lua腳本時需要用到lua模塊中的一些關於Kafka的依賴。
#下載lua-resty-kafka:
cd /opt/software/
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
unzip master.zip -d /opt/module/
#拷貝kafka相關依賴腳本到openresty
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
注:由於kafka大家都比較熟知,這里就不介紹它的安裝了。
Openresty安裝完成之后目錄結構如下:
drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
4.配置文件
編輯/opt/openresty/nginx/conf/nginx.conf
user nginx; #Linux的用戶
worker_processes auto;
worker_rlimit_nofile 100000;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 102400;
multi_accept on;
use epoll;
}
http {
include mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
resolver 8.8.8.8;
#resolver 127.0.0.1 valid=3600s;
sendfile on;
keepalive_timeout 65;
underscores_in_headers on;
gzip on;
include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個文件名字可自定義
}
編輯 /opt/openresty/nginx/conf/conf.d/common.conf
##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";
lua_shared_dict ngx_cache 128m; # cache
lua_shared_dict cache_lock 100k; # lock for cache
server {
listen 8887; #監聽端口
server_name 192.168.3.215; #埋點日志的ip地址或域名,多個域名之間用空格分開
root html; #root指令用於指定虛擬主機的網頁根目錄,這個目錄可以是相對路徑,也可以是絕對路徑。
lua_need_request_body on; #打開獲取消息體的開關,以便能獲取到消息體
access_log /var/log/nginx/message.access.log main;
error_log /var/log/nginx/message.error.log notice;
location = /lzp/message {
lua_code_cache on;
charset utf-8;
default_type 'application/json';
content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
}
}
編輯 /opt/openresty/nginx/lua/testMessage_kafka.lua
#創建目錄mkdir /opt/openresty/nginx/lua/ vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#編輯內存如下:
-- require需要resty.kafka.producer的lua腳本,沒有會報錯
local producer = require("resty.kafka.producer")
-- kafka的集群信息,單機也是可以的
local broker_list = {
{host = "192.168.3.215", port = 9092},
}
-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()
-- 定義kafka同步生產者,也可設置為異步 async
-- -- 注意!!!當設置為異步時,在測試環境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發送日志消息,send配套之第一個參數topic:
-- -- 發送日志消息,send配套之第二個參數key,用於kafka路由控制:
-- -- key為nill(空)時,一段時間向同一partition寫入數據
-- -- 指定key,按照key的hash寫入到對應的partition
-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)
local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
ngx.say("the message send successful")
else
ngx.say("未知錯誤")
end
5.啟動服務運行:
useradd nginx #創建用戶 passwd nginx #設置密碼 #設置openresty的所有者nginx chown -R nginx:nginx /opt/openresty/ #啟動服務 cd /opt/openresty/nginx/sbin ./nginx -c /opt/openresty/nginx/conf/nginx.conf 查看服務: ps -aux | grep nginx nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf 查看端口: netstat -anput | grep 8887 tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
看到以上進程,證明服務已正常運行
6.使用postman,發送post請求進行簡單的測試,查看kafka是否能否接受到數據

7.kafka消費數據:
kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
若消費到數據,則證明配置成功,若未調通可查看/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關錯誤日志進行調整
總結
使用Openresty+Lua+Kafka就可以將用戶的埋點數據實時采集到kafka集群中,並且Openresty是基於Nginx的,而Nginx能處理上萬的並發量,所以即使用戶的數據在短時間內激增,這套架構也能輕松的應對,不會導致集群崩潰。另一方面,若數據過多導致集群的超負荷,我們也可以隨時加多一台機器,非常方便。
另外一個小小的拓展:若業務數據非常多,需要發送到不同的topic中,我們也不用編寫多個腳本,而是可以聯系后端在json格式里面加一個字段,這個字段的值就是topic的名稱。我們只需要編寫一個通用腳本,解析Json數據將topic名稱拿出來就可以了。
