kafka从入门到熟练使用(在阿里云上搭建kafka)


一、同步的通信方式解决多个服务之间通信的问题,会存在性能和稳定性的问题

  同步的方式存在的问题:系统的响应时间太长(商城系统为例:下订单之后,要去调用创建订单,减库存,发优惠卷这些服务,服务之间的通信时间是比较长的,全部完成后才响应用户订单创建成功)

             在同步的过程中要保证每个服务都顺利执行完,整个链路才执行完,因为网络等其他问题,整个链路成功执行完的成功率会受影响,导致用户体验较差。

       

       异步的通信方式:将下订单消息发给消息队列,订单直接创建成功,微服务去订阅拉取消息,再去执行。

        优势:明显提升系统的吞吐量

           即使有服务失败,也可以通过分布式事务解决方案来保证业务执行后的最终一致性

        

二、MQ的介绍

  消息队列的目的是为了通讯,屏蔽底层复杂的通讯协议,定义了一套应用层的、更加简单的通信协议,这种协议遵循生产者/消费者模型。

  消息队列同时还有异步、解耦的作用。

  消息队列的流派:有broker的MQ:重topic:kafka、rocketMQ、activeMQ(整个broker,必须依据topic进行消息的中转)

                  轻topic:rabbitMQ (topic只是它众多中转模式的一种)

          无broker的MQ:zeroMQ(直接使用socket进行通信)

  kafka是消息处理性能最快的MQ

三、kafka介绍:kafka系一个分布式、支持分区的、多副本的,基于zookeeper的分布式消息系统,它可以实时处理大量数据。

  1.kafka使用场景:  

        日志收集

        消息系统

        用户活动跟踪

        运营指标

  2.kafka的安装(需要安装jdk配置好环境变量)

    https://kafka.apache.org/downloads.html官网下一个

    解压之后,进入config目录编辑server.properties,修改下日志存放位置,服务器端口号,zookeeper的连接地址

      broker.id=0

      log.dirs=/opt/kafka/klog/kafka-logs

      listeners=PLAINTEXT://:9092

      zookeeper.connect=121.43.37.22:2181 

    然后进入bin目录下,./kafka-server-start.sh  --daemon(守护台运行) ../config/server.properties

    进入zookeeper目录下,进入zk客户端,ls / brokers/ids 看有没有0这个节点,有就启动成功咯

  3.kafka中的基本概念

    Broker:消息中间件处理节点,一个kafka节点就是一个Broker,一个或多个Broker可以构成一个kafka集群

    Topic:逻辑上的概念,kafka根据topic对消息进行分类,发布到kafka集群的每条消息都必须指定一个Topic

    Producer:消息生产者,向Broker发送消息的客户端

    Consumer:消息消费者,从Broker读取消息的客户端

    ConsumerGroup:每个Consumer属于一个特定的Consumer组,一条消息可以被多个不同的ConsumerGroup中的Consumer消费,但是一个ConSumerGroup中只有一个Consumer能够消费该条消息

    Partition:物理上的分区,一个topic可以分为多个partition,每个partion内部消息是有序的。

    ReplicationFactor:副本就是分区的备份,放到集群的其他节点上,副本数一般设置和broker节点数量一致

  4.创建主题topic

    ./kafka-topics.sh --create --zookeeper 121.43.37.22:2181 --replication-factor 1(1个副本) --partitions 1(1个分区) --topics mj666

   查看当前kafka有哪些topic

    ./kafka-topics.sh --list --zookeeper 121.43.37.22:2181

  5.发送消息给broker中的某个topic

    kafka⾃带了⼀个producer命令客户端

    ./kafka-console-producer.sh --broker-list 121.43.37.22:9092 --topic mj666

  6.消费消息

    对于consumer,kafka同样也携带了⼀个命令⾏客户端

    ⽅式⼀:从最后⼀条消息的偏移量+1开始消费(就是从最新发送的消息开始消费) ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --topic mj666 

    ⽅式⼆:从头开始消费 ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --from-beginning --topic mj666 

    消息是会被顺序存储的,消息是有偏移量的,可以指明偏移量进行消费

    进入到我们指定好的log目录

    

 

 

     cd 进入mj666分区目录 , ll命令,会看到一个index目录文件,和一个log消息文件,broker会将消息保存在本地的日志文件中,每个消费者消费消息的偏移量是通过内置的_consumer_offsets主题保存的,这个主题有0-49这50个分区

    

 

 

     kafka采用了分段存储,每一个log文件的大小默认是1GB,没生成一个log文件就会对应产生一个index文件,是和log文件的命名相同的。这样在进行消息检索的时候可以快速利用二分的方法进行查找,定位到某一个分段文件中。

    index文件中并没有为每一条message建立索引。而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引

    这里具体的细节看下这位大哥写的十分详细:https://blog.csdn.net/shudaqi2010/article/details/90815675

  7.单波消息和多播消息

    单波消息:多个消费者在同一个消费组中,只有一个消费者可以收到订阅的topic中的消息./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-999mj group.id=mjGroup --topic mj666

    多播消息:不同的消费组订阅同一个topic,不同的消费组中只有一个消费者能收到消息。换言之,有多个消费者收到了同一条消息。

          ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-mj1 group.id=mjGroup  --topic mj666

          ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092 --consumer-mj2 group.id=mjGroup1 --topic mj666

  8.查看消费组以及信息

    查看当前主题下有哪些消费组./kafka-consumer-groups.sh --bootstrap-server 121.43.37.22:9092 --list

    查看消费组中的具体信息:⽐如当前偏移量、最后⼀条消息的偏移量、堆积的消息数量 ./kafka-consumer-groups.sh --bootstrap-server 121.43.37.22:9092 --describe --group mjGroup 

    

 

四、kafka主题和分区的概念

 

   一个主题中的消息量非常大,可以通过设置分区来分布式存储这些消息,如一个topic创建了3个分区,那么topic中的消息会分别存放在3个分区中。

    - 分区存储,解决了同一存储文件过大的问题

    - 提供了读写的吞吐量,读和写同时可以在多个分区中进行

  

  主题创建多个分区 ./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1   --topic mj1234

  查看topic的分区信息./kafka-topics.sh --describe --zookeeper localhost:2181 --topic mj1234

    - 每个消费者定期将⾃⼰消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是(consumerGroupId+topic+分区号),value就是当前offset的值,kafka会定期清理topic⾥的消息,最后就保留最新的那条数据

    - 因为__consumer_offsets可能会接收⾼并发的请求,kafka默认给其分配50个分区(可以通过offsets.topic.num.partitions设置),这样可以通过加机器的⽅式抗⼤并发。

    - 通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区公式:hash(consumerGroupId) % __consumer_offsets主题的分区数

五、kafka集群的搭建

 1.搭建3个broker的集群,再准备一个server1.properties,server2.properties,复制server.properties,做如下更改

    broker.id=1

    listeners=PLAINTEXT://:9093(这一条千万不要填ip地址,否则集群启动不起来,只有一个能起来,其他的要报端口被9092占用)

    log.dir=/usr/local/data/kafka-logs-2

   (advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners) 云服务器上要把这条配置打开,填上你的ip地址端口号,不然访问不到。

    broker.id=2

    listeners=PLAINTEXT://:9094

    log.dir=/usr/local/data/kafka-logs-3

    

  启动:./kafka-server-start.sh -daemon ../config/server.properties

     ./kafka-server-start.sh -daemon ../config/server1.properties

     ./kafka-server-start.sh -daemon ../config/server2.properties   

  搭建完后通过查看zk中的/brokers/ids 看是否启动成功,此时有这[0,1,3]2个节点

  *(内存不足:请查看这位大哥的https://blog.csdn.net/xukaics/article/details/48543881)

 

  编辑kafka-server-start.sh 成这样就行了 

 2.副本的概念

  副本是对分区的备份。在集群中,不同的副本会被部署在不同的broker上。下⾯例⼦:创建1个主题,2个分区、3个副本。 

  ./kafka-topics.sh --create --zookeeper 121.43.37.22:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

  查看主题信息:/kafka-topics.sh --describe --zookeeper 121.43.37.22:2181 --topic my-replicated-topic

  

 

   replicas:当前副本存在的broker节点 

   isr:可以同步的broker节点和已同步的broker节点,存放在isr集合中,如果isr中的节点性能较差,会被踢出isr集合

   leader:副本⾥的概念,消息发送⽅要把消息发给哪个broker?就看副本的leader是在哪个broker上⾯。副本⾥的leader专⻔⽤来接收消息。接收到消息,其他follower通过poll的⽅式来同步数据。

   follower:leader处理所有针对这个partition的读写请求,⽽follower被动复制leader,不提供读写。如果leader所在的broker挂掉,那么就会进⾏新leader的选举

  

 

   - 集群中有多个borker,创建主题时可以指明主题有多个分区(拆分存储),可以为分区创建多个副本,不同的副本放在不同的broker中。

   -  _consumer_offsets只有一个broker节点中有

 3.集群消息的发送

  ./kafka-console-producer.sh --broker-list 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --topic my-replicated-topic 

 4.kafka集群消息的消费

  ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --from-beginning --topic my-replicated-topic 

  指定消费组消费消息

  ./kafka-console-consumer.sh --bootstrap-server 121.43.37.22:9092,121.43.37.22:9093,121.43.37.22:9094 --from-beginning  --consumer-mj123  group.id=mj1234       --topic my-replicated-topic

   

 5.关于分区消费组消费者的细节 

  

 

 

      -  每个broker中有多个partition,⼀个partition只能被⼀个消费组⾥的某⼀个消费者消费,是为了保证消费顺序。Kafka只在partition的范围内保证消息消费的局部顺序性,当消息发给多个partition的话,消费者partition顺序不能保证是发送                    消息的顺序,所以不能在同⼀个topic中的多个partition中保证总的消费顺序性。

    - 一个消费者挂了的话,kafka的rebalance机制会把这个消费者的分区消息拿给其他消费者消费。

    - 消费组中消费者的数量不能⽐⼀个topic中的partition数量多,否则多出来的消费者消费不到消息。

六、kafka的java客户端-⽣产者的实现

  1.⽣产者的基本实现

    -引⼊依赖

    <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>    

    -代码   

public class MySimpleProducer {

private final static String TOPIC_NAME="abc";

public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"121.43.37.22:9092,
121.43.37.22:9093,
121.43.37.22:9094
");
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
     //第一个参数为主题名字,第二个参数为分区号,第三个参数key在不传入分区号的情况下通过算法(hash(key)%partitionnum)决定往哪个分区发消息,第4个参数为发送的消息 
  
     //final ProducerRecord<String,String> record = new ProducerRecord(TOPIC_NAME,"mykeyValue", "helloKafka");

final ProducerRecord<String,String> record = new ProducerRecord(TOPIC_NAME,0,"mykeyValue", "helloKafka");
       //1.生产者同步发消息
// RecordMetadata recordMetadata = producer.send(record).get();
// System.out.println(recordMetadata.topic()+":"+recordMetadata.partition()+":"+recordMetadata.offset());
//2.生产者异步发消息
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
System.out.println("失败"+e.getStackTrace());
}
if (recordMetadata!=null){
System.out.println("成功"+recordMetadata.topic()+recordMetadata.partition()+recordMetadata.offset());
}
}

}) ;
Thread.sleep(1000000000L);

}
}

   2.生产者的同步发送消息:如果⽣产者发送消息没有收到kafka的ack,⽣产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进⾏重试。重试的次数3次。  

      ⽣产者的异步发送消息:异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker在收到消息后异步调⽤⽣产者提供的callback回调⽅法。

    例如上面的异步发送代码,生产者发送完消息之后主线程直接关闭了,sleep之后才能看到回调的消息。

   3.生产者相关配置

   在同步发送的前提下,⽣产者在获得集群返回的ack之前会⼀直阻塞。那么集群什么时候返回ack呢?   

        ack配置:ack = 0 kafka-cluster不需要任何的broker收到消息,就⽴即返回ack给⽣产者,最容易丢消息的,效率是最⾼的 

          ack=1(默认): 多副本之间的leader已经收到消息,并把消息写⼊到本地的log中,才会返回ack给⽣产者,性能和安全性是最均衡的

          ack=-1/all。⾥⾯有默认的配置min.insync.replicas=2(默认为1,推荐配置⼤于等于2),此时就需要leader和⼀个follower同步完后,才会返回ack给⽣产者(此时集群中有2个broker已完成数据的接收),这种⽅式最安全,但性能最差。

    

 

 

     ack和重试(如果没有收到ack,就开启重试)的配置:

    props.put(ProducerConfig.ACKS_CONFIG, "1");

     发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造成消息重复发送,⽐如⽹络抖动,所以需要在 接收者那边做好消息接收的幂等性处理 

    props.put(ProducerConfig.RETRIES_CONFIG, 3);//重试次数设置

     props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);//重试间隔设置

    4.消息发送的缓冲区配置

    

 

 

     kafka默认会创建⼀个消息缓冲区,⽤来存放要发送的消息,缓冲区是32m

     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

     kafka本地线程会去缓冲区中⼀次拉16k的数据,发送到broker

    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

    如果线程拉不到16k的数据,间隔10ms也会将已拉到的数据发到broker

    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);

七、kafka的java客户端-消费者的实现

   1.基本实现

public class MyConsumer {
private final static String TOPIC_NAME = "1885";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"121.43.37.22:9092,121.43.37.22:9093");
//添加消费组
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建⼀个消费者的客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// 消费者订阅主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) { /* * poll() API 是拉取消息的⻓轮询 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }
}

   2.消费者自动提交和手动提交offset

    无论是自动还是手动,都需要把所属的消费组+消费的主题+消费的分区和消费的偏移量提交到集群的_consumer_offsets主题里面

    (1)自动提交:消费者poll消息下来以后自动提交offset,自动提交会丢失消息,消费者在消费前提交offset,可能提交完后还没有消费就挂掉,下一个消费组中的消费者会从提交的offset开始消费,之前未被消费的消息就丢失了。

      // 是否⾃动提交offset,默认就是true。  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

      // ⾃动提交offset的间隔时间 。   props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

     (2)手动提交: // 自动提交配置改为false  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        手动同步:消息消费完后,提交offset,会阻塞到offset提交成功后集群返回ACK。

//手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) { /* * poll() API 是拉取消息的⻓轮询 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }

//所有的消息已经消费完
if (records.count() > 0) {//有消息
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();




}
}


        手动异步:消息消费完后提交,不会阻塞不需要等待集群ACK,直接执行之后的逻辑,可以设置一个回调方法,供集群调用 

            

        *⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了

 

//手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
while (true) { /* * poll() API 是拉取消息的⻓轮询 */
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records)
{ System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }

//所有的消息已经消费完
if (records.count() > 0) {//有消息
// ⼿动同步提交offset,当前线程会阻塞直到offset提交成功
// ⼀般使⽤同步提交,因为提交之后⼀般也没有什么逻辑代码了
consumer.commitSync();

// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑
consumer.commitAsync(new OffsetCommitCallback()
{
@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception)
{ if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " + exception.getStackTrace());
} } });}


}
}    3.⻓轮询poll消息
      默认情况下,消费者⼀次会poll500条消息。
      //⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
      代码中设置了⻓轮询的时间是1000毫秒
      while (true) { /* * poll() API 是拉取消息的⻓轮询 */ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); }
    
  *那么:(1)如果⼀次poll到500条,就直接执⾏for循环
      如果这⼀次没有poll到500条。且时间在1秒内,那么⻓轮询继续poll,要么到500条,要么到1s。如果多次poll都没达到500条,且1秒时间到了,那么直接执⾏for循环
      (2)如果两次poll的间隔超过30s,集群会认为该消费者的消费能⼒过弱,该消费者被踢出消费组,触发rebalance机制,rebalance机制会造成性能开销。
        可以通过设置这个参数,让⼀次poll的消息条数少⼀点
         ⼀次poll最⼤拉取消息的条数,可以根据消费速度的快慢来设置 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
    

       

 

     4.消费者的健康状态检查

      消费者每隔1s向kafka集群发送⼼跳,集群发现如果有超过10s没有续约的消费者,将被踢出消费组,触发该消费组的rebalance机制,将该分区交给消费组⾥的其他消费者进⾏消费。

      //consumer给broker发送⼼跳的间隔时间 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

      //kafka如果超过10秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进⾏rebalance,把分区分配给其他消费者。 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

     5.指定分区和偏移量、时间消费

      指定分区消费:consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

      从头消费:  consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

             consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

      指定offset消费:consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));  

              consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

      指定时间消费:根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费

    

public class TimestampConsumer {

public static void main(String[] args) {

Properties props = new Properties();
props.put("bootstrap.servers", ":9092,:9092,:9092");
props.put("group.id", "dev3-yangyunhe-topic001-group001");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "lj";

try {
// 获取topic的partition信息
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();

Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date now = new Date();
long nowTime = now.getTime();
System.out.println("当前时间: " + df.format(now));
long fetchDataTime = nowTime - 1000 * 60 * 60; // 计算30分钟之前的时间戳

for(PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}

consumer.assign(topicPartitions);

// 获取每个partition一个小时之前的偏移量
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);

OffsetAndTimestamp offsetTimestamp = null;
System.out.println("开始设置各分区初始偏移量...");
for(Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if(offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp))+
", offset = " + offset);
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
System.out.println("设置各分区初始偏移量结束...");

while(true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}

 

      6.新消费组的消费offset规则

      新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的offset+1开始消费(消费新消息)。可以通过以下的设置,让新的消费者第⼀次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量+1)

      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

      earliest 
      当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
      latest 
      当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 
      none 
      topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

 

 

八、springboot项目整合kafka

   1.引入依赖

    

  <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

  2.编写配置文件

  

server:
port: 8080

spring:
kafka:
bootstrap-servers: 47.108.203.233:9092,47.108.203.233:9095
#独立的生产者和消费者只需要各自加各自的配置即可
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
ack-mode: manual_immediate
# ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
#redis:
#host: 121.43.37.22
#port: 6379

 3.创建生产者controller

  

@RestController
@RequestMapping("/msg")public class MyKafkaController {

private final static String TOPIC_NAME = "lj";

@Resource
private KafkaTemplate<String,String> kafkaTemplate;

@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
System.out.println("发送成功");
return "send success!";

}}
4.创建消费者

  

@Component
public class MyConsumer {

/* @KafkaListener(topics = "lj", groupId = "MyGroup778")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}*/

   //通过注解指定消费多个主题,多个分区,并指定消费的偏移量,指定消费者总数 
@KafkaListener(groupId = "99",
topicPartitions = {
@TopicPartition(topic = "lj", partitions = {"0", "1"}),
@TopicPartition(topic = "lb", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "30")) },concurrency = "3")
//concurrency就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset(这里配置的是每一条记录,即处理一条消息马上提交)
ack.acknowledge();
}
}

 九、kafka集群中的controller、rebalance、HW

  1.controller

     每个broker启动的时候会在zk创建一个临时序号节点,获得最小序号的那个broker就为成为kafka集群controller。

        - 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。选举规则:isr集合中最左边的节点(性能最高的)

        - 当检测到某个分区的ISR集合发⽣变化时(broker的数量发生变化),由控制器负责通知所有broker更新其元数据信息。

        - 当使⽤kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

  2.rebalance机制

  前提:消费者没有指明分区消费。

  触发: 当消费组⾥消费者和分区的关系发⽣变化,那么就会触发rebalance机制,重新调整消费者消费哪个分区

  在触发rebalance机制之前,消费者消费哪个分区有三种策略:

      range:通过公式来计算某个消费者消费哪个分区 (第一个消费者:sum(分区总数)/n(消费者数量)+1 (sum/n有余数时)   其他消费者:sum/n)

       轮询:  轮流去分分区来消费

      sticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进⾏调整。(若是没有开启粘合这种策略的话,触发rebalance之后,则会切掉之前所有的关系,重新按照range或者轮询分配)

    

  3.HW和LEO

  LEO是某个副本(副本的作用是高可用)最后消息的消息位置(log-end-offset)

  HW是已完成同步的位置(一个partition对应的isr中最小的LEO),consumer最多只能消费到HW所在的位置。

  每个replica都有HW,leader和follower各⾃负责更新⾃⼰的HW的状态。对于leader新写⼊的消息,consumer不能⽴刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。

  这样的⽬的是防⽌leader所在的broker挂掉造成消息的丢失,该消息仍可以从新选举的leader中获取。

 

⼗、Kafka中的优化问题

  1.如何防⽌消息丢失

  发送⽅: ack是1 或者-1/all 可以防⽌消息丢失,ack设成all,把min.insync.replicas设置>=2

  消费⽅:把⾃动提交改为⼿动提交。

  2.如何防⽌重复消费 

   在防⽌消息丢失的⽅案中,如果⽣产者发送完消息后,因为⽹络抖动,没有收到ack,但实际上broker已经收到了。此时⽣产者会进⾏重试,于是broker就会收到多条相同的消息,⽽造成消费者的重复消费。

     ⽣产者关闭重试:会造成丢消息(不建议)

     消费者解决⾮幂等性消费问题:所谓的幂等性:多次访问的结果是⼀样的。(对于rest的请求(get(幂等)、post(⾮幂等)、put(幂等)、delete(幂等))

     解决⽅案:(1)mysql插⼊业务id(unique),所以⼀次只能插⼊⼀条.(创建表之后ALTER TABLE `表名` ADD unique(`serverid`))

          (2)使⽤redis或zk的分布式锁,以业务id为锁。保证只有⼀条记录能够创建成功

 3.如何做到消息的顺序消费

    ⽣产者:保证消息按顺序消费,且消息不丢失——使⽤同步的发送,ack设置成⾮0的值。等到发送成功再发送下⼀条。确保消息是顺序发送的。

    消费者:主题只能设置一个分区,消费组只能有一个消费者。

    kafka顺序消费牺牲掉了性能,使用场景不多,RocketMQ有实现的功能 

 4.解决消息积压问题(消费者消费消息速度跟不上生产速度,导致kafka中有大量的数据没有被消费,随着没有被消费的消息越来越多,消费者寻址的性能会越来越差,导致整个kafka集群对外提供服务的性能越来越差,造成服务雪崩)

    解决方案:

        ⽅案⼀:在⼀个消费者中启动多个线程,让多个线程同时消费。——提升⼀个消费者的消费能⼒。

        (还可通过业务的架构设计,提升业务层面的消费性能)

        方案二:如果⽅案⼀还不够的话,创建多个消费组,启动多个消费者,多个消费者部署在不同的服务器上。其实多个消费者部署在同⼀服务器上也可以提⾼消费能⼒——充分利⽤服务器的cpu资源。

        ⽅案三:让⼀个消费者去把收到的消息往另外⼀个topic上发,另⼀个topic设置多个分区和多个消费者 ,进⾏具体的业务消费。(不常用)    

 5.延迟队列

    1)应⽤场景

    订单创建后,超过30分钟没有⽀付,则需要取消订单,这种场景可以通过延时队列来实现  

    2)具体⽅案

    - kafka中创建创建相应的主题(topic_5s、topic_30m)

    - 消费者消费该主题的消息(轮询)

    - 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没⽀付)

      -如果是:去数据库中修改订单状态为已取消

      -如果不是:记录当前消息的offset,并不再继续消费之后的消息。

            等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进⾏判断,以此反复 

 

十一、Kafka-eagle监控平台(EFAK)

   1)安装jdk 

   2)官网下载:http://download.kafka-eagle.org/解压

   3)配置环境变量:vim /etc/profile  配置立即生效source /etc/profile

   export KE_HOME=/usr/local/kafka-eagle

   export PATH=$PATH:$KE_HOME/bin 

   4)修改配置文件:vim system-config.properties

     修改⾥⾯的zk的地址和mysql的地址

   5)启动:./ke.sh start

   6)访问 公网ip:8048/ke   

 

 

 

 输入这个绿色的账号密码

 

 就可以查看信息了

                 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM