kafka 搭建以及 kafka收集log


  1. 首先搭建kafka和zookeeper,找帖子
    搭建時需要注意

    其次,保證zookeeper是集群環境,也可以代建偽集群,不然啟動不起來報:
    partitions have leader brokers without a matching listener。
  2. 一個生產者的消息如果沒有key值,則會隨機到每個分區,
    搭建時出了個問題,因為kafka配置文件默認partition為1,所以在我本地兩個消費者始終有一個消費者消費不到消息,
    后面改完配置文件分區設置2個,but 沒起作用,依然只有一個消費者能消費到消息

    各種查。。。網上也是都在說topic會發送到不同的partition,不同的partition會對應不同的消費者,吳國。。。
    后面在生產者新建一個topic_b 然后好了,看來是一個topic在生成時就會有一些操作來對應partition,后面在查閱。

  3. 卡夫卡是真的。測試幾萬條數據/嗖嗖就消費完了/
  4. kafka充當消息中間件
    利用spring boot
    pom.xml
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>

    application.yml

      kafka:
        bootstrap-servers: ip:9093,ip:9094,ip:9095
        consumer:
          group-id:  consumer-default
          auto-commit-interval: 100
          auto-offset-reset: earliest
          enable-auto-commit: true
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    如果是單機kafka 則配置一個即可
    a.生產者:

     @Autowired private KafkaTemplate kafkaTemplate; .... kafkaTemplate.send("topic_name","message"); 

    b.消費者:

    @Component public class KafkaCustomer { private static int count = 0; @KafkaListener(topics = {"topic_a"}) public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }

    期間遇到的問題上述已解釋哈/

  5. kafka 充當日志收集
    默認使用logback(springboot自帶)
    上配置主要配置logback-spring.xml即可,利用springboot搭建的消費者即可看到kafka的收集的日志。
    有一個問題哈/為什么用kafka收集呢?是優勢,更大的優勢可能是可以由不同的消費者,來分析日志。
    問題在於原先本人搭建過ELK收集日志,后來網上說logstash太消耗本地內存,后用filebeat來收集,
    logstash來解析日志,現在就是filebeat和kafka的區別和優勢又有哪些,筆記一下,后續查閱。
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration scan="true" scanPeriod="60 seconds" debug="false">
        <contextName>logback</contextName>
        <logger name="org.apache.kafka" level="OFF"/>
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>D:\log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>logback.%d{yyyy-MM-dd}.log</fileNamePattern>
            </rollingPolicy>
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
        <appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
            <topic>loges</topic>
            <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostNameKeyingStrategy" />
            <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
            <producerConfig>bootstrap.servers=ip:9093,ip:9094,ip:9095</producerConfig>
        </appender>
        <logger name="Application_ERROR">
            <appender-ref ref="KafkaAppender"/>
        </logger>
        <root level="debug">
            <appender-ref ref="console" />
            <appender-ref ref="file" />
            <appender-ref ref="KafkaAppender" />
        </root>
    </configuration>

    期間的問題:問題如下放, 一開始將日志級別設置成info沒有問題發生,但當把級別設成debug時,就卡住不動了,
    這個問題我認為主要是由於org.apache.kafka這個包底下有用到log.debug()方法,但項目加載到log.xml時log還為起作用,導致log.xml調用log.debug()時報錯進而導則timeout。

    解決方法,將org.apache.kafka報下的日志關閉,即:<logger name="org.apache.kafka" level="OFF"/>,即可解決下方的錯誤。

    22:25:40.199 logback [main] DEBUG o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Exception occurred during message send:
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

  6. 會繼續更新。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM