kafka生產者配置遇到的坑


接入其他系統的kafka集群時,遇到了一下問題:

org.springframework.kafka.support.LoggingProducerListener [76] [http-nio-9050-exec-1]- Exception thrown when sending a message with key='null' and payload='test' to topic lapm_notice:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

 起初還是照抄網上給的接入的方法,按部就班的開始寫客戶端,。。。結果完成時測試執行到了send方法時出現了這種問題,找度娘說時因為你的topic不可用,我去,這個時調別人的集群,我哪里能管到,所以這個問題果斷pass,熬了好幾天,試了幾種不同的方法,終於得以見天日,----版本的問題,我使用的kafka生產者版本是1.0.1,但是人家的集群的版本是0.9.0.1,我去,差了這么個版本就讓我掛了,果斷換了之后,發消息成功.

 下面是我的配置代碼:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Properties;

@Component
public class KafkaProducerUtil implements InitializingBean {

    @Resource
    private Properties props;
    @Resource
    private KafkaConfig kafkaConfig;

    private KafkaProducer<String, String> producer;

    public void init() {
        producer = new KafkaProducer<String, String>(getProps());
    }

    public KafkaProducer<String, String> getProducer() {
        return producer;
    }

    private Properties getProps() {
        // 服務器ip:端口號,集群用逗號分隔
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getServers());
        // key序列化指定類
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // value序列化指定類
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        return props;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }
}

 

導入的maven為:

 

<!-- kafka客戶端引入以支持kafka輸出 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

 

另外,以log4j2的方式配置發消息的方式也是相當實用的(某司給的demo,可惜我的架構不是這種的),但是這種需要你的日志框架是log4j2,代碼:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration name="defaultConfiguration" status="off" strict="true" monitorInterval="5">
	<properties>
		<!-- 日志輸出格式配置 -->
		<property name="patternlayout">%date{yyyy-MM-dd HH:mm:ss,SSS}|%level|%thread|%class{36}|%M|%msg%xEx|$|$|%n</property>
		<!-- 測試環境監控預警平台Kafka ip -->
        <property name="kafkaServers">127.0.0.1:9020</property>
	</properties>

	<Appenders>
		<Console name="Console" target="SYSTEM_OUT">
			<PatternLayout pattern="${patternlayout}" />
		</Console>
		<!-- 預警類日志KafkaAppender, lapm_notice為預警類Kafka topic -->
		<Kafka name="NoticeKafkaAppender" topic="lapm_notice" syncSend="false">
			<PatternLayout pattern="${patternlayout}" />
			<Property name="bootstrap.servers">${kafkaServers}</Property>
		</Kafka>
		<!-- 統計類日志KafkaAppender, lapm_statistics為統計類Kafka topic -->
		<Kafka name="StatisticsKafkaAppender" topic="lapm_statistics" syncSend="false">
			<PatternLayout pattern="${patternlayout}" />
			<Property name="bootstrap.servers">${kafkaServers}</Property>
		</Kafka>
		<!-- 場景類日志KafkaAppender, lapm_scene為場景類Kafka topic -->
		<Kafka name="SceneKafkaAppender" topic="lapm_scene" syncSend="false">
			<PatternLayout pattern="${patternlayout}" />
			<Property name="bootstrap.servers">${kafkaServers}</Property>
		</Kafka>
	</Appenders>

	<Loggers>
		<AsyncRoot level="info" includeLocation="true">
			<AppenderRef ref="Console" />
		</AsyncRoot>
		<!-- 預警類日志AsyncLogger -->
		<AsyncLogger name="Notice" level="info" includeLocation="true" additivity="true">
			<AppenderRef ref="NoticeKafkaAppender" />
		</AsyncLogger>
		<!-- 統計類日志 AsyncLogger -->
		<AsyncLogger name="Statistics" level="info" includeLocation="true" additivity="true">
			<AppenderRef ref="StatisticsKafkaAppender" />
		</AsyncLogger>
		<!-- 場景類日志AsyncLogger -->
		<AsyncLogger name="Scene" level="info" includeLocation="true" additivity="true">
			<AppenderRef ref="SceneKafkaAppender" />
		</AsyncLogger>
	</Loggers>
</Configuration>
/** 預警類日志 */
private Logger noticeLogger = LoggerFactory.getLogger("Notice");

noticeLogger.error(NoticeLogUtils.getNoticeInfo(businessCode, userCode, NoticeType.NOTICE_WARN, url, arguments, error, sysModuleCode, requestId, requestSerial)
				+ "異常預警日志系統自定義部分");

 

 maven配置如下:

<!-- LOG4j2引入 -->
		<dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j2.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j2.version}</version>
        </dependency>	
        <dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-web</artifactId>
			<version>${log4j2.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j2.version}</version>
		</dependency>
		<!-- SLF4J2引入 -->
		<dependency>  
		    <groupId>org.slf4j</groupId>  
		    <artifactId>slf4j-api</artifactId>  
		    <version>${slf4j.version}</version>  
		</dependency>  
		<dependency>  
		    <groupId>org.slf4j</groupId>  
		    <artifactId>jcl-over-slf4j</artifactId>  
		    <version>${slf4j.version}</version>  
		    <scope>runtime</scope>
		</dependency>
		<!-- LOG4J2異步日志需引入disruptor -->
 		<dependency>
			<groupId>com.lmax</groupId>
			<artifactId>disruptor</artifactId>
			<version>${lmax.version}</version>
		</dependency>
		<!-- kafka客戶端引入以支持kafka輸出 -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>${kafka.version}</version>
			<exclusions>
				<exclusion>
					<artifactId>slf4j-log4j12</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM