kafka生產實踐


1.引言

 

 

最近接觸到一個APP流量分析的項目,類似於友盟。涉及到幾個C端(客戶端)高並發的接口,這幾個接口主要用於C端數據的提交。在沒有任何緩沖的情況下,一個接口涉及到5張表的提交。壓測的結果很不理想,原因有以下幾點:

1.業務上處理原始數據,封裝數據庫對象,需要時間

2.服務與RDS的多張表進行交互

3.web接口同步處理業務

 

導致一台雙核,16G機子,單實例,jdbc最大連接數100,吞吐量竟然只有50TPS。

能想到的改造方案就是引入一層緩沖,讓C端接口不與RDS直接交互,也不直接處理業務,很自然就想到了rabbitmq,但是rabbitmq對分布式的支持比較一般,我們的數據體量也比較大,所以我們借鑒了友盟,引入了kafka,Kafka是一種高吞吐量的分布式發布訂閱消息系統,起初在不做任何kafka優化的時候,簡單地將C端提交的數據直接send到單節點kafka,就這樣,我們的吞吐量達到了100TPS.還是有點小驚喜的。

最近一段時間研究了一下kafka,對一些參數進行調整,目前接口的吞吐量已經達到220TPS,寫這篇文章主要想記錄一下自己優化和部署經歷。

 

 

 

2.kafka簡介

 

                             kafka的結構圖

 

這張圖很好的詮釋了kafka的結構,但是遺漏了一點,就是group的概念,我這里補充一下,一個組可以包含多個consumer對多個topic進行消費,但是不同組的消費都是獨立的。

也就是說同一個topic的同一條消息可以被不同組的consumer消費。

 

我這里的主要的優化途徑就是將kafka集群化,多partition化,使其並發度更高。

集群化都很好理解,那什么是多partition?

partition是topic的一個概念,即對topic進行分組,不同partition之間的消費相互獨立,且有序。並且任意Partition在某一個時刻只能被一個Consumer Group內的一個Consumer消費,所以咯,假如topic只有一個partition的話,那么在一個Consumer Group內有效的消費者實例最多也就1個,並發度就會受限於kafka的partition數目。

上面都是講消費,其實send操作也是一樣的,要保證有序必然要等上一個發送ack之后,下一個發送才能進行,如果只有一個partition,那send之后的ack的等待時間必然會阻塞下面一次send,設計多個partition之后,可以同時往多個partition發送消息,自然吞吐量也就上去。

 

 

3.kafka集群的搭建以及參數配置

 

集群搭建

准備兩台機子,然后去官網(http://kafka.apache.org/downloads)下載一個包。通過scp到服務器上,解壓進入config目錄,編輯server.config.

第一台機子配置(172.18.240.36):

broker.id=0  每台服務器的broker.id都不能相同


#hostname
host.name=172.18.240.36

#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設置zookeeper的連接端口
zookeeper.connect=172.18.240.36:4001
#默認partition數
num.partitions=2

第二台機子配置(172.18.240.62):

broker.id=1  每台服務器的broker.id都不能相同


#hostname
host.name=172.18.240.62

#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設置zookeeper的連接端口
zookeeper.connect=172.18.240.36:4001
#默認partition數
num.partitions=2

 

新增或者修改成以上配置。

對了,在此之前請先安裝zookeeper,如果你用的是zookeeper集群的話,zookeeper.connect可以填寫多個,中間用逗號隔開。

 

 

然后啟動

nohup  ./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

 

 

測試一下:

在第一台機子上開啟一個producer

./kafka-console-producer.sh --broker-list 172.18.240.36:9092 --topic test-test

在第二台機子上開啟一個consumer

./kafka-console-consumer.sh --bootstrap-server 172.18.240.62:9092 --topic test-test --from-beginning

 

第一台機子發送一條消息

第二台機子立馬收到消息

這樣kafka的集群部署就完成了。就下來我們來看看,java的客戶端代碼如何編寫。

 

 

 

4.kafka客戶端代碼示例

 

我這里的工程是建立在spring boot 之下的,僅供參考。

在 application.yml下添加如下配置:

kafka:
  consumer:
    default:
      server: 172.18.240.36:9092,172.18.240.62:9092
      enableAutoCommit: false
      autoCommitIntervalMs: 100
      sessionTimeoutMs: 15000
      groupId: data_analysis_group
      autoOffsetReset: latest
  producer:
    default:
      server: 172.18.240.36:9092,172.18.240.62:9092
      retries: 0
      batchSize: 4096
      lingerMs: 1
      bufferMemory: 40960

添加兩個配置類

package com.dtdream.analysis.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

import java.util.HashMap;
import java.util.Map;

@ConfigurationProperties(
        prefix = "kafka.consumer.default"
)
@EnableKafka
@Configuration
public class KafkaConsumerConfig {


    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
    private static String autoCommitIntervalMs;

    private static String sessionTimeoutMs;

    private static Class keyDeserializerClass = StringDeserializer.class;

    private static Class valueDeserializerClass = StringDeserializer.class;

    private static String groupId = "test-group";

    private static String autoOffsetReset = "latest";

    private static String server;

    private static boolean enableAutoCommit;

    public static String getServer() {
        return server;
    }

    public static void setServer(String server) {
        KafkaConsumerConfig.server = server;
    }

    public static boolean isEnableAutoCommit() {
        return enableAutoCommit;
    }

    public static void setEnableAutoCommit(boolean enableAutoCommit) {
        KafkaConsumerConfig.enableAutoCommit = enableAutoCommit;
    }

    public static String getAutoCommitIntervalMs() {
        return autoCommitIntervalMs;
    }

    public static void setAutoCommitIntervalMs(String autoCommitIntervalMs) {
        KafkaConsumerConfig.autoCommitIntervalMs = autoCommitIntervalMs;
    }

    public static String getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }

    public static void setSessionTimeoutMs(String sessionTimeoutMs) {
        KafkaConsumerConfig.sessionTimeoutMs = sessionTimeoutMs;
    }

    public static Class getKeyDeserializerClass() {
        return keyDeserializerClass;
    }

    public static void setKeyDeserializerClass(Class keyDeserializerClass) {
        KafkaConsumerConfig.keyDeserializerClass = keyDeserializerClass;
    }

    public static Class getValueDeserializerClass() {
        return valueDeserializerClass;
    }

    public static void setValueDeserializerClass(Class valueDeserializerClass) {
        KafkaConsumerConfig.valueDeserializerClass = valueDeserializerClass;
    }

    public static String getGroupId() {
        return groupId;
    }

    public static void setGroupId(String groupId) {
        KafkaConsumerConfig.groupId = groupId;
    }

    public static String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public static void setAutoOffsetReset(String autoOffsetReset) {
        KafkaConsumerConfig.autoOffsetReset = autoOffsetReset;
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                log.debug("partition is {},key is {},topic is {}",
                        consumerRecord.partition(), consumerRecord.key(), consumerRecord.topic());
                return false;
            }
        });
        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;

    }


   /* @Bean
    public Listener listener() {
        return new Listener();
    }*/
}

 

package com.dtdream.analysis.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: chenqimiao
 * Date: 2017/7/24
 * Time: 9:43
 * To change this template use File | Settings | File Templates.
 */
@ConfigurationProperties(
        prefix = "kafka.producer.default",
        ignoreInvalidFields = true
)//注入一些屬性域
@EnableKafka
@Configuration//使得@Bean注解生效
public class KafkaProducerConfig {
    private static String server;
    private static Integer retries;
    private static Integer batchSize;
    private static Integer lingerMs;
    private static Integer bufferMemory;
    private static Class keySerializerClass = StringSerializer.class;
    private static Class valueSerializerClass = StringSerializer.class;

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
        return props;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public static String getServer() {
        return server;
    }

    public static void setServer(String server) {
        KafkaProducerConfig.server = server;
    }

    public static Integer getRetries() {
        return retries;
    }

    public static void setRetries(Integer retries) {
        KafkaProducerConfig.retries = retries;
    }

    public static Integer getBatchSize() {
        return batchSize;
    }

    public static void setBatchSize(Integer batchSize) {
        KafkaProducerConfig.batchSize = batchSize;
    }

    public static Integer getLingerMs() {
        return lingerMs;
    }

    public static void setLingerMs(Integer lingerMs) {
        KafkaProducerConfig.lingerMs = lingerMs;
    }

    public static Integer getBufferMemory() {
        return bufferMemory;
    }

    public static void setBufferMemory(Integer bufferMemory) {
        KafkaProducerConfig.bufferMemory = bufferMemory;
    }

    public static Class getKeySerializerClass() {
        return keySerializerClass;
    }

    public static void setKeySerializerClass(Class keySerializerClass) {
        KafkaProducerConfig.keySerializerClass = keySerializerClass;
    }

    public static Class getValueSerializerClass() {
        return valueSerializerClass;
    }

    public static void setValueSerializerClass(Class valueSerializerClass) {
        KafkaProducerConfig.valueSerializerClass = valueSerializerClass;
    }

    @Bean(name = "kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

利用kafkaTemplate即可完成發送。

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;


 @RequestMapping(
            value = "/openApp",
            method = RequestMethod.POST,
            produces = MediaType.APPLICATION_JSON_UTF8_VALUE,
            consumes = MediaType.APPLICATION_JSON_UTF8_VALUE
    )
    @ResponseBody
    public ResultDTO openApp(@RequestBody ActiveLogPushBo activeLogPushBo, HttpServletRequest request) {

        logger.info("openApp: activeLogPushBo {}, dateTime {}", JSONObject.toJSONString(activeLogPushBo),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

        String ip = (String) request.getAttribute("ip");

        activeLogPushBo.setIp(ip);

        activeLogPushBo.setDate(new Date());

        //ResultDTO resultDTO = dataCollectionService.collectOpenInfo(activeLogPushBo);

        kafkaTemplate.send("data_collection_open",JSONObject.toJSONString(activeLogPushBo));

       // logger.info("openApp: resultDTO {} ,dateTime {}", resultDTO.toJSONString(),new DateTime().toString("yyyy-MM-dd HH:mm:ss.SSS"));

        return new ResultDTO().success();
    }
kafkaTemplate的send方法會更根據你指定的key進行hash,再對partition數進行去模,最后決定發送到那一個分區,假如沒有指定key,那send方法對分區的選擇是隨機。具體怎么隨機的話,這里就不展開講了,有興趣的同學可以自己看源碼,我們可以交流交流。



接着配置一個監聽器
package com.dtdream.analysis.listener;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import java.util.Optional;
@Component
public class Listener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"test-topic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            logger.info("message is {} ", message);
        }
    }
}

 

@KafkaListener其實可以具體指定消費哪個分區,如果不指定的話,並且只有一個消費者實例,那么這個實例會消費所有的分區的消息。

消費者的數量是一定要少於partition的數量的,不然沒有任何意義。會出現消費者過剩的情況。


消費者數量和partition數量的多與少,會動態影響消費節點所消費的partition數目,最終會在整個集群中達到一種動態平衡。
具體的關系可以參考http://www.jianshu.com/p/6233d5341dfe



5.總結


理論上只要cpu核心數無限,那么partition數也可以無上限,與此同時消費者節點和生產者節點也可以無上限,最終會使單個topic的並發無上限。單機的cpu的核心數總是會達到一個上限,kafka作為分布式系統,可以很好利用集群的運算能力,進行動態擴展,在DT時代,應該會慢慢成為主流吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM