函數
string rd_kafka_err2str ( integer $err ) 將rdkafka錯誤代碼轉換為字符串
integer rd_kafka_errno2err ( integer $errnox ) 將系統errno轉換為Kafka錯誤代碼
integer rd_kafka_errno ( void ) 返回系統errno
integer rd_kafka_offset_tail ( integer $cnt ) 返回一個特殊的偏移量值,該值可用於在主題尾部之前開始使用cnt消息
RdKafka\KafkaConsume類
這是高水平消費者,支持自動分區/撤銷(pecl rdkafka>=1.0.0,librdkafka>=0.9)
1)public void RdKafka\KafkaConsumer::assign ([ array $topic_partitions = NULL ] )
更新分配集到$topic_partitions,可以通過調用RdKafka\Conf::setDefaultTopicConf()來更改主題的默認配置
$kafkaConsumer->assign([
new RdKafka\TopicPartition("logs", 0),
new RdKafka\TopicPartition("logs", 1),
]);
2)public void RdKafka\KafkaConsumer::commit ([ mixed $message_or_offsets = NULL ] )
同步提交偏移,直到提交偏移或提交失敗為止。
如果注冊了COMMIT_CB回調,那么它將被調用,並包含未來要使用的調用的提交詳細信息。
參數
message_or_offsets
When NULL, commit offsets for the current assignment.
When a RdKafka\Message, commit offset for a single topic+partition based on the message.
When an array of RdKafka\TopicPartition, commit offsets for the provided list of partitions.
異常
Errors/Exceptions
Throws RdKafka\Exception on errors.
例子:
// Commit offsets for the current assignment
$kafkaConsumer->commit();
// Commit offsets based on the message's topic, partition, and offset
$kafkaConsumer->commit($message);
// Commit offsets by providing a list of TopicPartition
$kafkaConsumer->commit([
new RdKafka\TopicPartition($topic, $partition, $offset),
]);
3)public void RdKafka\KafkaConsumer::commitAsync ([ string $message_or_offsets = NULL ] )
異步提交偏移
4)public RdKafka\KafkaConsumer::__construct ( RdKafka\Conf $conf )
參數
conf (RdKafka\Conf)
The conf object must have group.id set to the consumer group to join.
conf對象必須將Group.id設置為要加入的消費者組。
示例:
$conf = new RdKafka\Conf();
$conf->set("group.id", "myGroupID");
$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
5)public RdKafka\Message RdKafka\KafkaConsumer::consume ( string $timeout_ms )
使用消息或獲取錯誤事件,觸發回調
將自動調用任何此類排隊事件的已注冊回調,包括rebalance_cb, event_cb, commit_cb, etc.
參數
timeout_ms (int) 超時時間(milliseconds)
返回值
Returns a RdKafka\Message. On error or timeout, RdKafka\Message::$err is != RD_KAFKA_ERR_NO_ERROR, and other properties should be ignored.
注意:
應用程序應確保定期調用consume (),即使沒有預期的消息,為等待調用的排隊回調提供服務,當rebalnce_cb已經注冊時,這一點尤其重要,因為需要正確地調用和處理它,以同步內部使用者狀態。
while (true) {
$message = $kafkaConsumer->consume(3600e3);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
handle($message);
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timedout\n";
break;
default:
throw new \Exception($message->errstr());
break;
}
}
6)public array RdKafka\KafkaConsumer::getAssignment ( void )
返回由assign設置 或 再平衡的 當前分區分配集
Returns the current partition assignment as set by RdKafka\KafkaConsumer::assign() or by rebalancing.
返回值
Returns an array of RdKafka\TopicPartition 返回RdKafka\Topic分區的數組
Errors/Exceptions
Throws RdKafka\Exception on errors.
6)public RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata ( bool $all_topics , RdKafka\KafkaConsumerTopic $only_topic = NULL , int $timeout_ms)
向代理請求元數據
參數
all_topics (bool)
When TRUE, request info about all topics in cluster. Else, only request info about locally known topics.如果為真,請求有關集群中所有主題的信息。否則,只請求有關本地已知主題的信息
only_topic (RdKafka\KafkaConsumerTopic)
When non-null, only request info about this topic當非空時,只請求有關此主題的信息。
timeout_ms (int)
Timeout (milliseconds) 超時
返回值
Returns a RdKafka\Metadata instance
示例
$all = $kafkaConsumer->metadata(true, NULL, 60e3);
$local = $kafkaConsumer->metadata(false, NULL, 60e3);
$topic = $kafkaConsumer->newTopic("myTopic");
$one = $kafkaConsumer->metadata(true, $topic, 60e3);
7)public array RdKafka\KafkaConsumer::getSubscription ( void )
返回RdKafka\KafkaConsumer:subscribe()設置的當前訂閱
Return the current subscription as set by RdKafka\KafkaConsumer::subscribe()
返回值
Returns an array of topic names 返回主題名稱數組
8)public void RdKafka\KafkaConsumer::subscribe ( array $topics )
將訂閱集更新為主題。
這將覆蓋當前任務。任何先前的訂閱都將首先被取消分配和取消訂閱。
訂閱集表示要消費的所需主題.......
可以通過調用RdKafka\Conf::setDefaultTopicConf()更改訂閱主題的默認配置。
$kafkaConsumer->assign([
"logs",
"^myPfx[0-9]_.*",
]);
9)public ReturnType RdKafka\KafkaConsumer::unsubscribe ( void )
從當前訂閱集取消訂閱
RdKafka\KafkaConsumerTopic類
(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)
This class represents a topic when using the RdKafka\KafkaConsumer. It can not be instantiated directly, RdKafka\KafkaConsumer::newTopic() should be used instead.
當想使用RdKafka\KafkaConsumer去表示一個主題的時候,不能直接實例化,應該使用RdKafka\KafkaConsumer::newTopic()替代
1)public void RdKafka\KafkaConsumerTopic::offsetStore ( integer $partition , integer $offset )
Store offset offset for topic partition partition. The offset will be commited (written) to the offset store according to auto.commit.interval.ms.
auto.commit.interval.ms消費者offset提交到zookeeper的頻率(以毫秒為單位)(0.9之后就默認存儲再broke中了)
auto.commit.enable must be set to false when using this API.使用此API時 auto.commit.enable必須設置為false,如果enable.auto.commit設置為true,則消費者偏移量自動提交給Kafka的頻率(以毫秒為單位)。
auto.offset.reset largest 如果ZooKeeper中沒有初始偏移量,或偏移值超出范圍,
該怎么辦?
最小:自動將偏移重置為最小偏移
最大:自動將偏移重置為最大偏移
* 其他任何事情:拋出異常消費者
參數
partition (integer)
Partition ID
offset (integer)
Offset
2)/* Inherited methods */
public string RdKafka\Topic::getName ( void )
RdKafka類
(PECL rdkafka >= 0.9.1)
This is the base class for low-level clients: RdKafka\Consumer, RdKafka\Producer. This class can not be instanciated directly, use one of the sub classes instead.
這是低級消費者客戶端的基類:RdKafka\Consumer,RdKafka\Producer。不能直接實例化這個類,而是使用其中一個子類。
1)public integer RdKafka::addBrokers ( string $broker_list )
將一個或多個代理添加到Kafka句柄的初始引導代理列表中。
當rdkafka通過查詢代理元數據連接到代理時,將自動發現其他代理。
如果代理名稱解析為多個地址(可能是地址家族),則所有代理名稱都將以循環方式用於連接嘗試。
返回值
Returns the number of brokers successfully added.成功添加的代理個數
代理還可以使用metadata.broker.list或bootstrap.server配置屬性(首選方法)進行定義。
$kafka->addBrokers("broker1:10000,broker2");
$kafka->addBrokers("SSL://broker3:9000,ssl://broker2");
2)public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )
Request Metadata from broker 向代理請求元數據
上面有一個
3)public integer RdKafka::getOutQLen ( void )
返回當前的輸出隊列長度。Out隊列包含等待發送給代理的消息,或代理知道的消息。
3)public RdKafka\Queue RdKafka::newQueue ( void )
創建一個新的消息隊列實例
Return Values
Returns a RdKafka\Queue.
4)public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )
Creates a new topic instance for topic_name.為Topic_Name創建一個新的主題實例。
Returns a RdKafka\Topic (more specifically, either a RdKafka\ConsumerTopic or a RdKafka\ProducerTopic).
為具有不同配置的同一主題名稱創建兩個主題實例沒有任何效果。每個主題實例都將使用第一個實例的配置。
$conf = new RdKafka\TopicConf();
$conf->set("...", "...");
$topic = $kafka->newTopic("myTopic", $conf);
4)public void RdKafka::poll ( integer $timeout_ms )
對於事件的輪詢,導致調用應用程序提供的回調
使用rdKafka子類的應用程序應該確保定期調用poll(),以便為等待調用的任何排隊回調服務。
Events:
Delivery report callbacks RdKafka\Conf::setDrMsgCb() [producer]
Error callbacks (RdKafka\Conf::setErrorCb())
Stats callbacks (RdKafka\Conf::setStatsCb())
Throttle callbacks (RdKafka\Conf::setThrottleCb())
Parameters
timeout_ms (integer)
Specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for an event, provide -1.
指定調用將阻止等待事件的最大時間(以毫秒為單位)。對於非阻塞調用,提供0作為timeout_ms。若要不確定地等待某個事件,請提供-1。
Return Values
Returns the number of events served.返回服務的事件數
5)public void RdKafka::setLogLevel ( integer $level )
指定內部Kafka日志記錄和調試產生的最大日志記錄級別。如果設置了“DEBUG”配置屬性,該級別將自動調整為LOG_DEBUG。
Parameters
level (integer)
Log level. Can take any LOG_* constant (see the syslog function).日志級別。可以接受任何log_*常量(請參閱syslog函數)。
RdKafka\Consumer 類
This is the low-level Kafka consumer. It can be used with Kafka >= 0.8.
低級消費者
1)public RdKafka\Consumer::__construct ([ RdKafka\Conf $conf = NULL ] )
Parameters
conf (RdKafka\Conf)
An optional RdKafka\Conf instance.
此類只有繼承(低級消費者基類RdKafka)的以下幾個方法
RdKafka\Consumer extends RdKafka {
/* Methods */
/* Inherited methods */
public integer RdKafka::addBrokers ( string $broker_list )
public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )
public integer RdKafka::getOutQLen ( void )
public RdKafka\Queue RdKafka::newQueue ( void )
public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )
public void RdKafka::poll ( integer $timeout_ms )
public void RdKafka::setLogLevel ( integer $level )
}
RdKafka\Producer類
(PECL rdkafka >= 0.9.1)
1)public RdKafka\Producer::__construct ([ RdKafka\Conf $conf = NULL ] )
Parameters
conf (RdKafka\Conf)
An optional RdKafka\Conf instance.
RdKafka\Producer extends RdKafka {
/* Methods */
/* Inherited methods */
public integer RdKafka::addBrokers ( string $broker_list )
public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )
public integer RdKafka::getOutQLen ( void )
public RdKafka\Queue RdKafka::newQueue ( void )
public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )
public void RdKafka::poll ( integer $timeout_ms )
public void RdKafka::setLogLevel ( integer $level )
}
RdKafka\Topic類
(PECL rdkafka >= 0.9.1)
1)public string RdKafka\Topic::getName ( void )
Returns the topic name.返回主題名稱
RdKafka\ConsumerTopic 類
(PECL rdkafka >= 0.9.1)
當使用RdKafka\Consumer時,該類表示一個主題。不能直接實例化它,應該使用RdKafka\Consumer:newTopic()。
1)public RdKafka\Message RdKafka\ConsumerTopic::consume ( integer $partition , integer $timeout_ms )
消費-使用來自分區的單個消息
消費者之前必須使用 RdKafka\ConsumerTopic::consumeStart().
必須檢查返回消息的ERR屬性是否存在錯誤。
Err屬性等於RD_Kafka_RESP_ERR_PARY_EOF,表示已到達分區的結束,通常不應將其視為錯誤。應用程序應該處理這種情況(例如,忽略)。
Parameters
partition (integer)
The partition to consume
timeout_ms
The maximum amount of time to wait for a message to be received.
Returns a RdKafka\Message or NULL on timeout. 正常返回RdKafka\Message,超時返回NULL。
2)public void RdKafka\ConsumerTopic::consumeQueueStart ( integer $partition , integer $offset , RdKafka\Queue $queue )
與RdKafka\ConsumerTopic::consumerTopic()相同,但將傳入消息重新路由到提供的隊列。應用程序必須使用一個RdKafka\Queue::consumer*()函數來接收獲取的消息。
參數
partition (integer)
Partition ID
offset (integer)
Offset
queue (RdKafka\Queue)
A RdKafka\Queue instance
3)public void RdKafka\ConsumerTopic::consumeStart ( integer $partition , integer $offset )
開始在偏移量處使用分區的消息(請參閱參數中允許的值)。
librdkafka將嘗試通過反復從代理獲取批消息,直到達到閾值,從而將queued.min.messages (config屬性)消息保留在本地隊列中。
應用程序應該使用RdKafka\ConsumerTopic::consumeStart()方法來使用本地隊列中的消息,每個Kafka消息都表示為RdKafka\Message對象。
對於同一個主題和分區,不能多次調用RdKafka\ConsumerTopic::consumeStart()。在沒有停止消費的情況下,先使用RdKafka\ConsumerTopic::consumeStop()停止消費后再開始消費。
Parameters
partition (integer)
Partition ID
offset (integer)
Can be either a proper offset (0..N), or one the the special offset:
可以是正常的偏移量(0.N),也可以是特殊的偏移量:
RD_KAFKA_OFFSET_BEGINNING
RD_KAFKA_OFFSET_END
RD_KAFKA_OFFSET_STORED
The return value of rd_kafka_offset_tail()
示例:
$partition = 123;
// consume from the end
$topic->consumeStart($partition, RD_KAFKA_OFFSET_END);
// consume from the stored offset
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
// consume 200 messages from the end
$topic->consumeStart($partition, rd_kafka_offset_tail(200));
4)public void RdKafka\ConsumerTopic::consumeStop ( integer $partition )
Stop consuming messages from partition停止使用來自分區的消息
停止使用分區消息,清除當前本地隊列中的所有消息。
5)public void RdKafka\ConsumerTopic::offsetStore ( integer $partition , integer $offset )
store offset存儲offset
Parameters
partition (integer)
Partition ID
offset (integer)
Offset
RdKafka\ProducerTopic類
(PECL rdkafka >= 0.9.1)
當使用RdKafka\Producer時,該類表示一個主題。不能直接實例化它,應該使用RdKafka\Producer::newTopic().
RdKafka\ProducerTopic extends RdKafka\Topic {
/* Methods */
public void produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )
/* Inherited methods */
public string RdKafka\Topic::getName ( void )
}
1)public void RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] )
生成並向代理發送一條消息。這是一個異步和非阻塞的。
Parameters
partition (integer)
Can be either RD_KAFKA_PARTITION_UA (unassigned) for automatic partitioning using the topic's partitioner function (see RdKafka\TopicConf::setPartitioner(), or a fixed partition (0..N).
msgflags (integer)
可以是RD_Kafka_PARID_UA(未分配的),用於使用主題的分區函數(請參見RdKafka\TopicConf::setPartitioner(),也可以是固定的分區(0.N)。
msgflags (integer)
Must be 0
payload (string)
Payload string
key (string)
Optional message key, if non-NULL it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
可選消息鍵,如果非空,則將其傳遞給主題分區程序,並與消息一起發送給代理並傳遞給使用者。
$message = [
'type' => 'account-created',
'id' => $accountId,
'date' => date(DATE_W3C),
];
$payload = json_encode($message);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $payload, $accountId);with the message to the broker and passed on to the consumer.
RdKafka\Queuel類
(PECL rdkafka >= 0.9.1)
1)public RdKafka\Message RdKafka\Queue::consume ( string $timeout_ms )
使用一條消息
Parameters
timeout_ms
The maximum amount of time to wait for a message to be received.
Return Values
Returns a RdKafka\Message or NULL on timeout.
RdKafka\Message 類
(PECL rdkafka >= 0.9.1)
此對象表示單個已消費或生產的消息或事件(設置了$err)。
This object represents either a single consumed or produced message, or an event ($err is set).
An application must check RdKafka\Message::err to see if the object is a proper message (error is RD_KAFKA_RESP_ERR_NO_ERROR) or an error event.
RdKafka\Message {
/* Properties */
public $err ; //Error code
public $topic_name ;
public $partition ;
public $payload ;
public $key ;
public $offset ;
/* Methods */
public string errstr ( void )
}
1)public string RdKafka\Message::errstr ( void )
這是一種方便的方法,將錯誤作為字符串返回
Return Values
The error as a string
if ($message->err) {
echo $message->errstr(), "\n";
}
RdKafka\Conf 類
(PECL rdkafka >= 0.9.1)
This class holds configuration for consumers and producers.
A list of available properties can be found on the »librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.
該類包含使用者和生產者的配置
請注意,可用的配置屬性和默認值可能會根據librdkafka 版本而改變。
RdKafka\Conf {
/* Methods */
public void dump ( void )
public void set ( string $name , string $value )
public void setDefaultTopicConf ( RdKafka\TopicConf $topic_conf )
public void setDrMsgCb ( callable $callback )
public void setErrorCb ( callable $callback )
public void setRebalanceCb ( callable $callback )
}
1)public void RdKafka\Conf::dump ( void )
Dumps the configuration properties and values to an array.
轉儲配置屬性和值到數組
Return Values
Returns an array with configuration properties as keys, and configuration values as values.
2)public void RdKafka\Conf::set ( string $name , string $value )
Set configuration property name to value.
設置配置屬性 屬性名=>屬性值
3)public void RdKafka\Conf::setDefaultTopicConf ( RdKafka\TopicConf $topic_conf )
設置用於自動訂閱主題的默認主題配置。可以與RdKafka\KafkaConsumer::subscribe()或者RdKafka\KafkaConsumer::assign()一起使用
Sets the default topic configuration to use for for automatically subscribed topics. This can be used along with RdKafka\KafkaConsumer::subscribe() or RdKafka\KafkaConsumer::assign().
4)public void RdKafka\Conf::setDrMsgCb ( callable $callback )
設置傳遞報告回調,對於RdKafka\ProducerTopic::Producer()接受的每條消息,將調用一次傳遞報告回調,並將ERR設置為指示生產請求的結果。
當消息成功地生成時,或者如果librdkafka 遇到永久故障,或者臨時錯誤的重試計數器已經耗盡,就會調用回調。
應用程序必須定期調用rdKafka::poll(),以便為排隊的傳遞報告回調服務。
Parameters
callback (callable)
A callable with the following signature:
<?php
/**
* @param RdKafka\Kafka $kafka
* @param RdKafka\Message $message
*/
function (RdKafka\Kafka $kafka, RdKafka\Message $message);
$conf->setDrMsgCb(function ($kafka, $message) {
if ($message->err) {
// message permanently failed to be delivered
} else {
// message successfully delivered
}
});
5)public void RdKafka\Conf::setErrorCb ( callable $callback )
設置錯誤回調。librdkafka 使用錯誤回調將ciritcal錯誤信號發送回應用程序。
Parameters
callback (callable)
A callable with the following signature:
<?php
/**
* @param object $kafka
* @param int $err
* @param string $reason
*/
function ($kafka, $err, $reason);
<?php
$conf->setErrorCb(function ($kafka, $err, $reason) {
printf("Kafka error: %s (reason: %s)\n", rd_kafka_err2str($err), $reason);
});
?>
6)public void RdKafka\Conf::setRebalanceCb ( callable $callback )
Set rebalance callback for use with coordinated consumer group balancing.
設置“再平衡回調”,以便與協調的消費者組 平衡一起使用。
Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.
注冊一個reBalance_cb會關閉librdkafka的自動分區分配/撤銷,而是將這一責任委托給應用程序的reBalance_cb。
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONSand RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.
重新平衡回調負責根據這兩個事件RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS和RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS更新librdkafka的分配集,但也應該能夠再平衡處理任意的不止這些的失敗。
In this latter case (arbitrary error), the application must $kafka->assign(NULL) to synchronize (同步)state.
后一種情況 必須使用assign去同步狀態
在沒有重新平衡回調的情況下,這是由librdkafka自動完成的,但是注冊一個重新平衡回調會使應用程序在執行其他操作時具有靈活性,同時還可以執行排序/撤銷操作(assinging/revocation)。例如從另一個位置獲取偏移量(在賦值時)或手動提交偏移量(在REVOKE上)。
Parameters
callback (callable)
A callable with the following signature:
<?php
/**
* @param RdKafka\KafkaConsumer $kafka
* @param int $err
* @param array $partitions
*/
function (RdKafka\KafkaConsumer $kafka, $err, $partitions);
ERR參數被設置為RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS或RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS(或意外錯誤)。
partitions參數是RdKafka\TopicPartition數組,表示分配或撤銷的完整分區集。
<?php
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
// application may load offets from arbitrary external
// storage here and update partitions
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
if ($manual_commits) {
// Optional explicit manual commit
$kafka->commit($partitions);
}
$kafka->assign(NULL);
break;
default:
handle_unlikely_error($err);
$kafka->assign(NULL); // sync state同步狀態
break;
}
}
?>
RdKafka\TopicConf類
(PECL rdkafka >= 0.9.1)
該類保存主題topic實例的配置。
A list of available properties can be found on the » librdkafka repository. Note that available configuration properties and default values may change depending on the librdkafka version.
注意配置屬性依賴版本 , 可以從librdkafka倉庫中查看詳細配置
RdKafka\TopicConf {
/* Methods */
public void dump ( void )
public void set ( void )
public void setPartitioner ( integer $partitioner )
}
1)public void RdKafka\TopicConf::dump ( void )
將配置屬性和值轉儲到數組。
返回一個數組,其中配置屬性作為鍵,配置值作為值。
2)public void RdKafka\TopicConf::set ( void )
Set configuration property name to value.
3)public void RdKafka\TopicConf::setPartitioner ( integer $partitioner )
將分區器設置為根據keys將消息路由到分區。
Parameters
partitioner (integer)
Must be one of the RD_KAFKA_MSG_PARTITIONER_* constants.
必須是RD_Kafka_MSG_Partiator_*常量之一。
RdKafka\Exception類
(PECL rdkafka >= 0.9.1)
RdKafka Exception.異常類
RdKafka\Exception extends Exception {
/* Inherited properties */
protected string $message ;
protected int $code ;
protected string $file ;
protected int $line ;
/* Methods */
/* Inherited methods */
final public string Exception::getMessage ( void )
final public Exception Exception::getPrevious ( void )
final public mixed Exception::getCode ( void )
final public string Exception::getFile ( void )
final public int Exception::getLine ( void )
final public array Exception::getTrace ( void )
final public string Exception::getTraceAsString ( void )
public string Exception::__toString ( void )
final private void Exception::__clone ( void )
}
RdKafka\TopicPartition類
(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)
Topic+Partition 主題加分區
RdKafka\TopicPartition {
/* Methods */
public integer getOffset ( void )
public integer getPartition ( void )
public string getTopic ( void )
public void setOffset ( string $offset )
public ReturnType setPartition ( string $partition )
public ReturnType setTopic ( string $topic_name )
}
1)public RdKafka\TopicPartition::__construct ( string $topic , integer $partition [, integer $offset = NULL ] )
Parameters
topic (string)
Topic name
partition (integer)
Partition ID
offset (integer)
Offset
<?php
new RdKafka\TopicPartition("myTopic", 1);
?>
2)public integer RdKafka\TopicPartition::getOffset ( void )
獲取偏移量
3)public integer RdKafka\TopicPartition::getPartition ( void )
Gets the partition ID. 得到分區id
4)public string RdKafka\TopicPartition::getTopic ( void )
Gets the topic name. 得到主題
5)public void RdKafka\TopicPartition::setOffset ( string $offset )
Sets the offset. 設置偏移量
6)public ReturnType RdKafka\TopicPartition::setPartition ( string $partition )
Sets the partition ID.
7)public ReturnType RdKafka\TopicPartition::setTopic ( string $topic_name )
Sets the topic name.
RdKafka\Metadata類
(PECL rdkafka >= 0.9.1)
The Metadata class represents broker information. Metadata instances are returned by RdKafka::getMetadata() and RdKafka\KafkaConsumer::getMetadata().
元數據類表示代理信息。元數據實例由RdKafka::getMetadata() 和RdKafka\KafkaConsumer::getMetadata()返回。
RdKafka\Metadata {
/* Methods */
public RdKafka\Metadata\Collection getBrokers ( void )
public int getOrigBrokerId ( void )
public string getOrigBrokerName ( void )
public RdKafka\Metadata\Collection getTopics ( void )
}
1)public RdKafka\Metadata\Collection RdKafka\Metadata::getBrokers ( void )
Get broker list
Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\Broker
2)public int RdKafka\Metadata::getOrigBrokerId ( void )
獲取源自此元數據的代理id
3)public string RdKafka\Metadata::getOrigBrokerName ( void )
獲取源自此元數據的代理名稱。
4)public RdKafka\Metadata\Collection RdKafka\Metadata::getTopics ( void )
獲取主題列表。根據元數據的請求方式,這可能包含單個主題、本地已知主題列表或所有集群主題。
Returns a RdKafka\Metadata\Collection of RdKafka\Metadata\Topic
RdKafka\Metadata\Collection類
(PECL rdkafka >= 0.9.1)
集合類用作元數據項的集合。它實現了 Countable and Iterable,因此它可以與count()和foreach一起使用
RdKafka\Metadata\Collection implements Countable , Iterator {
/* Methods */
public int count ( void )
public mixed current ( void )
public scalar key ( void )
public void next ( void )
public void rewind ( void )
public boolean valid ( void )
}
1)public int RdKafka\Metadata\Collection::count ( void )
Returns the number of elements as integer 返回元素數量
2)public mixed RdKafka\Metadata\Collection::current ( void )
Gets the current value. 獲取到當前的值
返回值:
The current value if it is valid or NULL otherwise.
3)public scalar RdKafka\Metadata\Collection::key ( void )
Get the current key.
返回值:
The current key if it is valid or NULL otherwise.
4)public void RdKafka\Metadata\Collection::next ( void )
移到下一個元素。
5)public void RdKafka\Metadata\Collection::rewind ( void )
將Iterator倒轉到第一個元素
6)public boolean RdKafka\Metadata\Collection::valid ( void )
Checks if current position is valid 檢查當前位置是否有效
Returns TRUE on success or FALSE on failure.
Predefined Constants
The constants below are defined by this extension, and will only be available when the extension has either been compiled into PHP or dynamically loaded at runtime.
下面的常量是由這個擴展定義的,並且只有當擴展編譯到PHP或在運行時動態加載時才可用。
RD_KAFKA_CONSUMER (integer)
RD_KAFKA_OFFSET_BEGINNING (integer)
Start consuming from beginning of kafka partition queue: oldest msg.
RD_KAFKA_OFFSET_END (integer)
Start consuming from end of kafka partition queue: next msg.
RD_KAFKA_OFFSET_STORED (integer)
Start consuming from offset retrieved from offset store.
RD_KAFKA_PARTITION_UA (integer)
The unassigned partition is used by the producer API for messages that should be partitioned using the configured or default partitioner.
RD_KAFKA_PRODUCER (integer)
RD_KAFKA_VERSION (integer)
RD_KAFKA_RESP_ERR__BEGIN (integer)
RD_KAFKA_RESP_ERR__BAD_MSG (integer)
Local: Bad message format
RD_KAFKA_RESP_ERR__BAD_COMPRESSION (integer)
Local: Invalid compressed data
RD_KAFKA_RESP_ERR__DESTROY (integer)
Local: Broker handle destroyed
RD_KAFKA_RESP_ERR__FAIL (integer)
Local: Communication failure with broker
RD_KAFKA_RESP_ERR__TRANSPORT (integer)
Local: Broker transport failure
RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE (integer)
Local: Critical system resource failure
RD_KAFKA_RESP_ERR__RESOLVE (integer)
Local: Host resolution failure
RD_KAFKA_RESP_ERR__MSG_TIMED_OUT (integer)
Local: Message timed out
RD_KAFKA_RESP_ERR__PARTITION_EOF (integer)
Broker: No more messages
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION (integer)
Local: Unknown partition
RD_KAFKA_RESP_ERR__FS (integer)
Local: File or filesystem error
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC (integer)
Local: Unknown topic
RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN (integer)
Local: All broker connections are down
RD_KAFKA_RESP_ERR__INVALID_ARG (integer)
Local: Invalid argument or configuration
RD_KAFKA_RESP_ERR__TIMED_OUT (integer)
Local: Timed out
RD_KAFKA_RESP_ERR__QUEUE_FULL (integer)
Local: Queue full
RD_KAFKA_RESP_ERR__ISR_INSUFF (integer)
Local: ISR count insufficient
RD_KAFKA_RESP_ERR__NODE_UPDATE (integer)
Local: Broker node update
RD_KAFKA_RESP_ERR__SSL (integer)
Local: SSL error
RD_KAFKA_RESP_ERR__WAIT_COORD (integer)
Local: Waiting for coordinator
RD_KAFKA_RESP_ERR__UNKNOWN_GROUP (integer)
Local: Unknown group
RD_KAFKA_RESP_ERR__IN_PROGRESS (integer)
Local: Operation in progress
RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS (integer)
Local: Previous operation in progress
RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION (integer)
Local: Existing subscription
RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS (integer)
Local: Assign partitions
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS (integer)
Local: Revoke partitions
RD_KAFKA_RESP_ERR__CONFLICT (integer)
Local: Conflicting use
RD_KAFKA_RESP_ERR__STATE (integer)
Local: Erroneous state
RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL (integer)
Local: Unknown protocol
RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED (integer)
Local: Not implemented
RD_KAFKA_RESP_ERR__AUTHENTICATION (integer)
Local: Authentication failure
RD_KAFKA_RESP_ERR__NO_OFFSET (integer)
Local: No offset stored
RD_KAFKA_RESP_ERR__END (integer)
RD_KAFKA_RESP_ERR_UNKNOWN (integer)
Unknown broker error
RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE (integer)
Broker: Offset out of range
RD_KAFKA_RESP_ERR_INVALID_MSG (integer)
Broker: Invalid message
RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART (integer)
Broker: Unknown topic or partition
RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE (integer)
Broker: Invalid message size
RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE (integer)
Broker: Leader not available
RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION (integer)
Broker: Not leader for partition
RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT (integer)
Broker: Request timed out
RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE (integer)
Broker: Broker not available
RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE (integer)
Broker: Replica not available
RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE (integer)
Broker: Message size too large
RD_KAFKA_RESP_ERR_STALE_CTRL_EPOCH (integer)
Broker: StaleControllerEpochCode
RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE (integer)
Broker: Offset metadata string too large
RD_KAFKA_RESP_ERR_NETWORK_EXCEPTION (integer)
Broker: Broker disconnected before response received
RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS (integer)
Broker: Group coordinator load in progress
RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE (integer)
Broker: Group coordinator not available
RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP (integer)
Broker: Not coordinator for group
RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION (integer)
Broker: Invalid topic
RD_KAFKA_RESP_ERR_RECORD_LIST_TOO_LARGE (integer)
Broker: Message batch larger than configured server segment size
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS (integer)
Broker: Not enough in-sync replicas
RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND (integer)
Broker: Message(s) written to insufficient number of in-sync replicas
RD_KAFKA_RESP_ERR_INVALID_REQUIRED_ACKS (integer)
Broker: Invalid required acks value
RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION (integer)
Broker: Specified group generation id is not valid
RD_KAFKA_RESP_ERR_INCONSISTENT_GROUP_PROTOCOL (integer)
Broker: Inconsistent group protocol
RD_KAFKA_RESP_ERR_INVALID_GROUP_ID (integer)
Broker: Invalid group.id
RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID (integer)
Broker: Unknown member
RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT (integer)
Broker: Invalid session timeout
RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS (integer)
Broker: Group rebalance in progress
RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE (integer)
Broker: Commit offset data size is not valid
RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED (integer)
Broker: Topic authorization failed
RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED (integer)
Broker: Group authorization failed
RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED (integer)
Broker: Cluster authorization failed
RD_KAFKA_CONF_UNKNOWN (integer)
RD_KAFKA_CONF_INVALID (integer)
RD_KAFKA_CONF_OK (integer)
RD_KAFKA_MSG_PARTITIONER_RANDOM (integer)
The random partitioner. This was the default partitioner in librdkafka 0.8. Assigns partition randomly.
RD_KAFKA_MSG_PARTITIONER_CONSISTENT (integer)
The consistent partitioner. Uses consistent hashing to map identical keys onto identical partitions. Uses CRC32 as hashing function. Messages with no key or empty key are always assigned to the same partition.
RD_KAFKA_LOG_PRINT (integer)
The print logger. Prints messages to stderr.
RD_KAFKA_LOG_SYSLOG (integer)
The syslog logger. Sends messages to syslog.
RD_KAFKA_LOG_SYSLOG_PRINT (integer)
The syslog-print partitioner. Sends messages to syslog and prints them to stderr.
