Openresty+Lua+Kafka實現日志實時采集


簡介

  在很多數據采集場景下,Flume作為一個高性能采集日志的工具,相信大家都知道它。許多人想起Flume這個組件能聯想到的大多數都是Flume跟Kafka相結合進行日志的采集,這種方案有很多他的優點,比如高性能、高吞吐、數據可靠性等。但是我們如果要求對日志進行實時的采集,這顯然不是一個好的解決方案。原因如下:

  就目前來說,Flume能支持實時監控一個目錄的數據文件,一旦對某個目錄的文件采集完成,就會打上completed的標志,若之后再有數據進入這個文件中,Flume則不會檢測到。

  所以,我們更多的是使用這種方案進行定時采集,只要有一個新的數據目錄產生,我們就采集這個目錄下的數據文件。

  那么接下來本篇文章將為大家介紹基於Openresty+Lua+Kafka對日志進行實時的采集。

需求

  很多時候,我們需要對用戶的埋點數據進行一個實時的采集,然后用這些數據對用戶的行為做一些實時的分析。所以,第一步當然是先解決怎樣對數據進行實時的采集。

  這里我們用到的方案是Openresty+Lua+Kafka。

原理介紹

  那么什么是Openresty呢?這里引用官方的一段話: 

  OpenResty是一個基於Nginx與Lua的高性能Web平台,其內部集成了大量精良的Lua庫、第三方模塊以及大多數的依賴項。用於方便地搭建能夠處理超高並發、擴展性極高的動態 Web 應用、Web 服務和動態網關。
  OpenResty通過匯聚各種設計精良的Nginx模塊,從而將Nginx有效地變成一個強大的通用Web應用平台。這樣,Web開發人員和系統工程師可以使用Lu 腳本語言調動Nginx支持的各種C以及Lua模塊,快速構造出足以勝任10K乃至1000 以上單機並發連接的高性能Web應用系統。
  OpenResty的目標是讓你的Web服務直接跑在Nginx服務內部,充分利用Nginx的非阻塞 I/O 模型,不僅僅對 HTTP 客戶端請求,甚至於對遠程后端諸如MySQL、PostgreSQL、Memcached 以及 Redis 等都進行一致的高性能響應。

  簡單來說,就是將客戶端的請求(本文指的是用戶的行為日志)通過Nginx把用戶的數據投遞到我們指定的地方(Kafka),而為了實現這個需求,我們用到了Lua腳本,因為Openresty封裝了各種Lua模塊,其中有一個模塊就是對Kafka模塊進行了分裝,我們只需要寫一個簡單的腳本就可以將用戶的數據通過Nginx轉發到Kafka中,以便后續對數據進行消費。

  這里給出一張架構圖,方便大家理解:

 

  

  在這里簡單總結一下使用Openresty+Lua+Kafka的優點:

    1.支持多種業務數據,不同的業務數據,只需要配置不同的Lua腳本,就可以將不同的業務數據發送到Kafka不同的topic中。

    2.對用戶觸發的埋點數據進行實時的采集

    3.高可靠的集群,Openresty由於是基於Nginx,其集群擁有非常高的性能和穩定性。

    4.高並發,相比tomcat、apache等web服務器,Nginx的並發量遠遠高於其他兩種。正常情況下處理上萬的並發量都不是什么難事。

  那么接下來我們就動手實操一下。

Openresty的安裝

本實例采用的單機部署形式,當單機部署成功了之后,集群的搭建跟單機一樣,只是在不同的機器上執行相同的步驟而已。

注:本實驗基於centos7.0操作系統

1.下載Openresty依賴:

yum install readline-devel pcre-devel openssl-devel gcc 

2.編譯安裝Openresty:

#1.安裝openresty: 
mkdir /opt/software 
mkdir /opt/module
cd /opt/software/ # 安裝文件所在目錄  
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz  
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4 
#2.配置:  
# 指定目錄為/opt/openresty,默認在/usr/local。  
./configure --prefix=/opt/openresty \  
            --with-luajit \  
            --without-http_redis2_module \  
            --with-http_iconv_module  
make  
make install  

3.安裝lua-resty-kafka

因為我們需要將數據通過nginx+lua腳本轉發到Kafka中,編寫lua腳本時需要用到lua模塊中的一些關於Kafka的依賴。

#下載lua-resty-kafka:
cd /opt/software/  
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip  
unzip master.zip -d /opt/module/  
    
#拷貝kafka相關依賴腳本到openresty  
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/

 注:由於kafka大家都比較熟知,這里就不介紹它的安裝了。

Openresty安裝完成之后目錄結構如下:

drwxr-xr-x  2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x  6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x  7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx

4.配置文件

編輯/opt/openresty/nginx/conf/nginx.conf

user  nginx;  #Linux的用戶
worker_processes  auto;
worker_rlimit_nofile 100000;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

events {
    worker_connections  102400;
    multi_accept on;
    use epoll;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    resolver 8.8.8.8;
    #resolver 127.0.0.1 valid=3600s;

    sendfile        on;

    keepalive_timeout  65;

    underscores_in_headers on;

    gzip  on;

    include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個文件名字可自定義

}

 編輯 /opt/openresty/nginx/conf/conf.d/common.conf

##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";

lua_shared_dict ngx_cache 128m;  # cache
lua_shared_dict cache_lock 100k; # lock for cache

server {
    listen       8887; #監聽端口
    server_name  192.168.3.215; #埋點日志的ip地址或域名,多個域名之間用空格分開
    root         html; #root指令用於指定虛擬主機的網頁根目錄,這個目錄可以是相對路徑,也可以是絕對路徑。
    lua_need_request_body on; #打開獲取消息體的開關,以便能獲取到消息體

    access_log /var/log/nginx/message.access.log  main;
    error_log  /var/log/nginx/message.error.log  notice;

    location = /lzp/message {
        lua_code_cache on;
        charset utf-8;
        default_type 'application/json';
        content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua腳本
    }
}

 編輯 /opt/openresty/nginx/lua/testMessage_kafka.lua

#創建目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#編輯內存如下:
-- require需要resty.kafka.producer的lua腳本,沒有會報錯
local producer = require("resty.kafka.producer")

-- kafka的集群信息,單機也是可以的
local broker_list = {
    {host = "192.168.3.215", port = 9092},
}

-- 定義最終kafka接受到的數據是怎樣的json格式
local log_json = {}
--增加read_body之后即可獲取到消息體,默認情況下可能會是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()

-- 定義kafka同步生產者,也可設置為異步 async
-- -- 注意!!!當設置為異步時,在測試環境需要修改batch_num,默認是200條,若大不到200條kafka端接受不到消息
-- -- encode()將log_json日志轉換為字符串
-- -- 發送日志消息,send配套之第一個參數topic:
-- -- 發送日志消息,send配套之第二個參數key,用於kafka路由控制:
-- -- key為nill(空)時,一段時間向同一partition寫入數據
-- -- 指定key,按照key的hash寫入到對應的partition

-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)

local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
   ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
   ngx.say("the message send successful")
else
   ngx.say("未知錯誤")
end

5.啟動服務運行:

useradd nginx #創建用戶
passwd nginx #設置密碼

#設置openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/

#啟動服務
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf

查看服務:
ps -aux | grep nginx
nginx     2351  0.0  0.1 231052 46444 ?        S    Mar30   0:33 nginx: worker process
nginx     2352  0.0  0.1 233396 48540 ?        S    Mar30   0:35 nginx: worker process
nginx     2353  0.0  0.1 233396 48536 ?        S    Mar30   0:33 nginx: worker process
nginx     2354  0.0  0.1 232224 47464 ?        S    Mar30   0:34 nginx: worker process
nginx     2355  0.0  0.1 231052 46404 ?        S    Mar30   0:33 nginx: worker process
nginx     2356  0.0  0.1 232224 47460 ?        S    Mar30   0:34 nginx: worker process
nginx     2357  0.0  0.1 231052 46404 ?        S    Mar30   0:34 nginx: worker process
nginx     2358  0.0  0.1 232224 47484 ?        S    Mar30   0:34 nginx: worker process
root      7009  0.0  0.0 185492  2516 ?        Ss   Mar24   0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf


查看端口:
netstat -anput | grep 8887
tcp        0      0 0.0.0.0:8887            0.0.0.0:*               LISTEN      2351/nginx: worke

看到以上進程,證明服務已正常運行

6.使用postman,發送post請求進行簡單的測試,查看kafka是否能否接受到數據

 

 

 7.kafka消費數據:

kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning

若消費到數據,則證明配置成功,若未調通可查看/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關錯誤日志進行調整

總結

  使用Openresty+Lua+Kafka就可以將用戶的埋點數據實時采集到kafka集群中,並且Openresty是基於Nginx的,而Nginx能處理上萬的並發量,所以即使用戶的數據在短時間內激增,這套架構也能輕松的應對,不會導致集群崩潰。另一方面,若數據過多導致集群的超負荷,我們也可以隨時加多一台機器,非常方便。

  另外一個小小的拓展:若業務數據非常多,需要發送到不同的topic中,我們也不用編寫多個腳本,而是可以聯系后端在json格式里面加一個字段,這個字段的值就是topic的名稱。我們只需要編寫一個通用腳本,解析Json數據將topic名稱拿出來就可以了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM