接入其他系統的kafka集群時,遇到了一下問題:
org.springframework.kafka.support.LoggingProducerListener [76] [http-nio-9050-exec-1]- Exception thrown when sending a message with key='null' and payload='test' to topic lapm_notice: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
起初還是照抄網上給的接入的方法,按部就班的開始寫客戶端,。。。結果完成時測試執行到了send方法時出現了這種問題,找度娘說時因為你的topic不可用,我去,這個時調別人的集群,我哪里能管到,所以這個問題果斷pass,熬了好幾天,試了幾種不同的方法,終於得以見天日,----版本的問題,我使用的kafka生產者版本是1.0.1,但是人家的集群的版本是0.9.0.1,我去,差了這么個版本就讓我掛了,果斷換了之后,發消息成功.
下面是我的配置代碼:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Properties; @Component public class KafkaProducerUtil implements InitializingBean { @Resource private Properties props; @Resource private KafkaConfig kafkaConfig; private KafkaProducer<String, String> producer; public void init() { producer = new KafkaProducer<String, String>(getProps()); } public KafkaProducer<String, String> getProducer() { return producer; } private Properties getProps() { // 服務器ip:端口號,集群用逗號分隔 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getServers()); // key序列化指定類 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化指定類 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return props; } @Override public void afterPropertiesSet() throws Exception { init(); } }
導入的maven為:
<!-- kafka客戶端引入以支持kafka輸出 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
另外,以log4j2的方式配置發消息的方式也是相當實用的(某司給的demo,可惜我的架構不是這種的),但是這種需要你的日志框架是log4j2,代碼:
<?xml version="1.0" encoding="UTF-8"?> <Configuration name="defaultConfiguration" status="off" strict="true" monitorInterval="5"> <properties> <!-- 日志輸出格式配置 --> <property name="patternlayout">%date{yyyy-MM-dd HH:mm:ss,SSS}|%level|%thread|%class{36}|%M|%msg%xEx|$|$|%n</property> <!-- 測試環境監控預警平台Kafka ip --> <property name="kafkaServers">127.0.0.1:9020</property> </properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="${patternlayout}" /> </Console> <!-- 預警類日志KafkaAppender, lapm_notice為預警類Kafka topic --> <Kafka name="NoticeKafkaAppender" topic="lapm_notice" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> <!-- 統計類日志KafkaAppender, lapm_statistics為統計類Kafka topic --> <Kafka name="StatisticsKafkaAppender" topic="lapm_statistics" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> <!-- 場景類日志KafkaAppender, lapm_scene為場景類Kafka topic --> <Kafka name="SceneKafkaAppender" topic="lapm_scene" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> </Appenders> <Loggers> <AsyncRoot level="info" includeLocation="true"> <AppenderRef ref="Console" /> </AsyncRoot> <!-- 預警類日志AsyncLogger --> <AsyncLogger name="Notice" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="NoticeKafkaAppender" /> </AsyncLogger> <!-- 統計類日志 AsyncLogger --> <AsyncLogger name="Statistics" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="StatisticsKafkaAppender" /> </AsyncLogger> <!-- 場景類日志AsyncLogger --> <AsyncLogger name="Scene" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="SceneKafkaAppender" /> </AsyncLogger> </Loggers> </Configuration>
/** 預警類日志 */ private Logger noticeLogger = LoggerFactory.getLogger("Notice"); noticeLogger.error(NoticeLogUtils.getNoticeInfo(businessCode, userCode, NoticeType.NOTICE_WARN, url, arguments, error, sysModuleCode, requestId, requestSerial) + "異常預警日志系統自定義部分");
maven配置如下:
<!-- LOG4j2引入 --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-web</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j2.version}</version> </dependency> <!-- SLF4J2引入 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> <scope>runtime</scope> </dependency> <!-- LOG4J2異步日志需引入disruptor --> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${lmax.version}</version> </dependency> <!-- kafka客戶端引入以支持kafka輸出 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>