Kafka消費者——結合spring開發


Kafka消費者端

可靠性保證

作為消費端,消費數據需要考慮的是:

1、不重復消費消息

2、不缺失消費消息

自動提交 offset 的相關參數:

enable.auto.commit: 是否開啟自動提交 offset 功能(true)
auto.commit.interval.ms: 自動提交 offset 的時間間隔 (1000ms = 1s)

手動提交offset 的相關參數:

enable.auto.commit: 是否開啟自動提交 offset 功能(false)

異步提交也個缺點,那就是如果服務器返回提交失敗,異步提交不會進行重試。相比較起來,同步提交會進行重試直到成功或者最后拋出異常給應用。異步提交沒有實現重試是因為,如果同時存在多個異步提交,進行重試可能會導致位移覆蓋。舉個例子,假如我們發起了一個異步提交commitA,此時的提交位移為2000,隨后又發起了一個異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時commitA進行重試並成功的話,會將實際上將已經提交的位移從3000回滾到2000,導致消息重復消費。

雖然同步提交 offset 更可靠一些,但是由於其會阻塞當前線程,直到提交成功。因此吞吐量會收到很大的影響。因此更多的情況下,會選用異步提交 offset 的方式。

無論是同步提交還是異步提交 offset,都有可能會造成數據的漏消費或者重復消費。先提交 offset 后消費,有可能造成數據的漏消費;而先消費后提交 offset,有可能會造成數據的重復消費 。所以,在保證數據完整性的前提下,選擇同步提交同時盡量能在消費端進行消息去重的操作。

spring-kafka消費者端

spring-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/context
	http://www.springframework.org/schema/context/spring-context.xsd
	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">

    <context:component-scan base-package="listener" />
    <!--<context:component-scan base-package="concurrent" />-->


    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <!--broker集群-->
                <entry key="bootstrap.servers" value="192.168.25.10:9092,192.168.25.11:9092,192.168.25.12:9092"/>
                <!--groupid-->
                <entry key="group.id" value="group1"/>
                <!--
                earliest 
                當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 
                latest 
                當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 
                none 
                topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
                -->
                <entry key="auto.offset.reset" value="earliest "/>
                <!--自動提交-->
                <entry key="enable.auto.commit" value="false"/>
                <!--自動提交重試等待時間-->
                <entry key="auto.commit.interval.ms" value="1000"/>
                <!--檢測消費者故障的超時-->
                <entry key="session.timeout.ms" value="30000"/>
                <!--key反序列化-->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <!--value反序列化-->
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>
    <!--consumer工廠-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg  >
            <list>
                <value>topic1</value>
                <value>topic2</value>
            </list>
        </constructor-arg>
        <property name="messageListener" ref="kafkaConsumerListener"/>
		<property name="pollTimeout" value="1000"/>
		<property name="AckMode" value="MANUAL"/>
    </bean>

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" >
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>

    <!-- 並發消息監聽容器,執行doStart()方法 -->
<!--    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
        <constructor-arg ref="consumerFactory" />
        <constructor-arg ref="containerProperties" />
        &lt;!&ndash;#消費監聽器容器並發數&ndash;&gt;
        &lt;!&ndash;concurrency = 3&ndash;&gt;
        <property name="concurrency" value="3" />
    </bean>-->
</beans>

AckMode
RECORD每處理一條commit一次

BATCH(默認)每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率

TIME 每次間隔ackTime的時間去commit(跟auto commit interval有什么區別呢?)

COUNT 累積達到ackCount次的ack去commit

COUNT_TIMEackTime或ackCount哪個條件先滿足,就commit

MANUAL listener負責ack,但是背后也是批量上去

MANUAL_IMMEDIATE listner負責ack,每調用一次,就立即commit

KafkaConsumerListener類

(同步提交)

@Component
public class KafkaConsumerListener implements AcknowledgingMessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> stringStringConsumerRecord, Acknowledgment acknowledgment) {
        System.out.printf("offset= %d, key= %s, value= %s,topic= %s,partition= %s\n",
                stringStringConsumerRecord.offset(),
                stringStringConsumerRecord.key(),
                stringStringConsumerRecord.value(),
                stringStringConsumerRecord.topic(),
                stringStringConsumerRecord.partition());
                acknowledgment.acknowledge();
    }
}

測試

    @Test
    public  void consumer() {
        ApplicationContext context = new ClassPathXmlApplicationContext("listener.xml");
        System.out.printf("啟動listener");
        while (true) {

        }
    }

結果:

offset= 57, key= null, value= 2019-11-19 03:40:45,topic= topic1,partition= 0
offset= 4929, key= null, value= 2019-11-19 03:40:47,topic= topic2,partition= 2

kafka消費者如何才能從頭開始消費某個topic的全量數據

消費者要從頭開始消費某個topic的全量數據,需要滿足2個條件(spring-kafka):

(1)使用一個全新的"group.id"(就是之前沒有被任何消費者使用過);

(2)指定"auto.offset.reset"參數的值為earliest;

對應的spring-kafka消費者客戶端配置參數為:

<!-- 指定消費組名 -->
<entry key="group.id" value="fg11"/>
<!-- 從何處開始消費,latest 表示消費最新消息,earliest 表示從頭開始消費,none表示拋出異常,默認latest -->
<entry key="auto.offset.reset" value="earliest"/>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM