lua-resty-kafka配置文檔


參考網址:https://github.com/doujiang24/lua-resty-kafka

 

一、例子

content_by_lua '

          -- 引入lua所有api

         local cjson = require "cjson"

         local producer = require "resty.kafka.producer"

 

         -- 定義kafka broker地址,ip需要和kafka的host.name配置一致 

         local broker_list = {

             { host = "192.168.101.223", port = 9092 }, 

             { host = "192.168.101.224", port = 9092 }

         }

         local key = "key"

         local message = "halo world"

         local error_handle = function (topic, partition_id, queue, index, err, retryable)

                  ngx.log(ngx.ERR, "failed to send to kafka, topic: ", topic, "; partition_id: ", partition_id, "; retryable: ", retryable)

         end

 

         local p = producer:new(broker_list, { producer_type = "async", max_retry = 1, batch_num = 1, error_handle = error_handle })

 

         local ok, err = p:send("test", key, message)

         if not ok then

                  ngx.say("send err:", err)

             return

         end

         ngx.say("send ok:", ok)

         p:flush()

';

二、語法

New

p = producer:new(broker_list, producer_config?, cluster_name?)

 

broker_list :客戶端列表

    local broker_list = {

             { host = "192.168.101.223", port = 9092 }, 

             { host = "192.168.101.224", port = 9092 }

         }

 

producer_config? :可選參數

 

    producer_type 生產者類型,同步或者異步,"async" or "sync"

         request_timeout 請求超時,默認是 2000 ms

         required_acks 請求應答不能為0,默認是1

    max_retry 信息發送重試最大次數,默認是3

    retry_backoff 信息發送重試補償,默認100ms

    partitioner 選擇分區從鍵和分區num的分區。

       local partitioner = function (key, partition_num, correlation_id) end

       默認

         local function default_partitioner(key, num, correlation_id)

               local id = key and crc32(key) or correlation_id

               -- partition_id is continuous and start from 0

               return id % num

        end

 

         !!!以下為緩沖區參數,只有producer_type = "async"有效

         flush_time 隊列最大緩存時間,默認1000ms

    batch_num 隊列最大批次數量,默認200

    batch_size 保存緩存大小,默認是1M(最大可以為2M),需要小心,這個跟kafka配置有關,socket.request.max.byts為2-10k

    max_buffering 隊列最大緩存大小,默認50000

    error_handle 錯誤處理,當緩沖區發送到kafka錯誤時處理數據。

                   error_handle = function (topic, partition_id, message_queue, index, err, retryable) end

       失敗的消息隊列如{ key1, msg1, key2, msg2 },鍵值key在消息隊列中是為””相當與orign中的nil,

       Index為消息隊列的長度

       Retryable為true時,這意味着kafka服務器肯定沒有提交這些消息。可以嘗試重試發送消息,

    暫時不支持壓縮

 

cluster_name? :可選參數

    指定集群的名稱,默認是1(該參數是一個數值),當您有兩個或多個kafka集群時,您可以指定不同的名稱,只有producer_type = "async"有效

 

 

 

Send

ok, err = p:send(topic, key, message)

    同步模式時 :

       如果成功,返回當前代理和分區的偏移量(** cdata: LL **)。如果出現錯誤,返回nil,用字符串描述錯誤

         異步模式時 :

       消息將首先寫入緩沖區。當緩沖區超過batch_num時,它將發送到kafka服務器,或者每個flush_time刷新緩沖區。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM