參考網址:https://github.com/doujiang24/lua-resty-kafka
一、例子
content_by_lua '
-- 引入lua所有api
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
-- 定義kafka broker地址,ip需要和kafka的host.name配置一致
local broker_list = {
{ host = "192.168.101.223", port = 9092 },
{ host = "192.168.101.224", port = 9092 }
}
local key = "key"
local message = "halo world"
local error_handle = function (topic, partition_id, queue, index, err, retryable)
ngx.log(ngx.ERR, "failed to send to kafka, topic: ", topic, "; partition_id: ", partition_id, "; retryable: ", retryable)
end
local p = producer:new(broker_list, { producer_type = "async", max_retry = 1, batch_num = 1, error_handle = error_handle })
local ok, err = p:send("test", key, message)
if not ok then
ngx.say("send err:", err)
return
end
ngx.say("send ok:", ok)
p:flush()
';
二、語法
New
p = producer:new(
broker_list
, producer_config?, cluster_name?
)
broker_list
:客戶端列表
local broker_list = {
{ host = "192.168.101.223", port = 9092 },
{ host = "192.168.101.224", port = 9092 }
}
producer_config?
:可選參數
producer_type
:
生產者類型,同步或者異步,
"async" or "sync"
request_timeout
:
請求超時,默認是 2000 ms
required_acks
:
請求應答
,
不能為0,默認是1
max_retry
:
信息發送重試
最大次數,默認是3
retry_backoff
:
信息發送重試補償,默認100ms
partitioner
:
選擇分區從鍵和分區num的分區。
local partitioner = function (
key, partition_num, correlation_id) end
默認
local function default_partitioner(key, num, correlation_id)
local id = key and crc32(key) or correlation_id
-- partition_id is continuous and start from 0
return id % num
end
!!!以下為緩沖區參數,只有producer_type
= "async"有效
flush_time
:
隊列最大緩存時間,默認1000ms
batch_num
:
隊列最大批次數量,默認200
batch_size
:
保存緩存大小,默認是1M(最大可以為2M),需要小心,這個跟kafka配置有關,socket.request.max.byts為2-10k
max_buffering
:
隊列最大緩存大小,默認50000
error_handle
:
錯誤處理,當緩沖區發送到kafka錯誤時處理數據。
error_handle = function (topic, partition_id, message_queue, index, err, retryable) end
失敗的消息隊列如{ key1, msg1, key2, msg2 },鍵值key在消息隊列中是為””相當與orign中的nil,
Index
為消息隊列的長度
Retryable
為true時,這意味着kafka服務器肯定沒有提交這些消息。可以嘗試重試發送消息,
暫時不支持壓縮
cluster_name?
:可選參數
指定集群的名稱,默認是1(該參數是一個數值),當您有兩個或多個kafka集群時,您可以指定不同的名稱,只有producer_type = "async"有效
Send
ok, err = p:send(topic, key, message)
同步模式時 :
如果成功,返回當前代理和分區的偏移量(** cdata: LL **)。如果出現錯誤,返回nil,用字符串描述錯誤
異步模式時 :
消息將首先寫入緩沖區。當緩沖區超過batch_num時,它將發送到kafka服務器,或者每個flush_time刷新緩沖區。