一個消費者組有一個或者多個消費者
如果一個消費者組訂閱了主題,那么這個主題中的某個分區只能分配給消費者組中的某個消費者,不能分配給多個消費者。
當1個消費者,6個分區時,將讀取6個分區中的數據。
當3個消費者,6個分區時,每個消費者讀取其中的2個分區的數據
當6個消費者,6個分區時,每個消費者讀取其中的一個分區的數據。
當7個消費者,6個分區時,有一個消費者將不讀取數據,這樣會造成資源浪費。
在訂閱主題時,自動或手動分配分區。
單數據消費的MessageListener
批量消費的BatchMessageListener
具備ACK機制的AcknowledgingMessageListener和BatchAcknowledgingMessageListener
@KafkaListener
id:消費者的id,當GroupId沒有被配置的時候,默認id為GroupId
containerFactory:上面提到了@KafkaListener區分單數據還是多數據消費只需要配置一下注解的containerFactory屬性就可以了,
這里面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
topics:需要監聽的Topic,可監聽多個
topicPartitions:可配置更加詳細的監聽信息,必須監聽某個Topic中的指定分區,或者從offset為200的偏移量開始監聽
errorHandler:監聽異常處理器,配置BeanName
groupId:消費組ID
idIsGroup:id是否為GroupId
clientIdPrefix:消費者Id前綴
beanRef:真實監聽容器的BeanName,需要在 BeanName前加 "__"
springboot整合kafka消費者代碼
com.xml.utils.XmlParse : xml解析類

package com.xml.utils; import com.mongodb.domain.BaseDeviceLog; import com.mongodb.domain.Key; import com.utils.common.DateUtils; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.*; public class XmlParse { private final static Logger logger= LoggerFactory.getLogger(XmlParse.class); public static List<BaseDeviceLog> parseXmlFromString(String xmlString){ List<BaseDeviceLog> baseDeviceLogList=new ArrayList<>(); try { Document document=null; document= DocumentHelper.parseText(xmlString); /* SAXReader reader =new SAXReader(); Document document=reader.read("E:\\tmp\\test1.xml");*/ Element rootElement=document.getRootElement().element("Logs"); Iterator iterator= rootElement.elementIterator("Log"); while (iterator.hasNext()){ BaseDeviceLog baseDeviceLog=new BaseDeviceLog(); Element logElement=(Element) iterator.next(); if(logElement.elementTextTrim("Guid")!=null){ baseDeviceLog.setLogGuid(logElement.elementTextTrim("Guid")); } if(logElement.elementTextTrim("SerialNumber")!=null){ baseDeviceLog.setSerialNumber(logElement.elementTextTrim("SerialNumber")); } if(logElement.elementTextTrim("ProductNumber")!=null){ baseDeviceLog.setProductNumber(logElement.elementTextTrim("ProductNumber")); } if(logElement.elementTextTrim("SystemID")!=null){ baseDeviceLog.setSystemId(logElement.elementTextTrim("SystemID")); } if(logElement.elementTextTrim("Type")!=null){ baseDeviceLog.setLogType(logElement.elementTextTrim("Type")); } if(logElement.elementTextTrim("SourceName")!=null){ baseDeviceLog.setSourceName(logElement.elementTextTrim("SourceName")); } if(logElement.elementTextTrim("Description")!=null){ baseDeviceLog.setDescription(logElement.elementTextTrim("Description")); } if(logElement.elementTextTrim("Level")!=null){ baseDeviceLog.setLogLevel(logElement.elementTextTrim("Level")); } if(logElement.elementTextTrim("Priority")!=null){ baseDeviceLog.setLogType(logElement.elementTextTrim("Priority").toUpperCase()); } if(logElement.elementTextTrim("UID")!=null){ baseDeviceLog.setUid(logElement.elementTextTrim("UID")); } if(logElement.elementTextTrim("AttamentFilePath")!=null){ baseDeviceLog.setAttamentFilePath(logElement.elementTextTrim("AttamentFilePath")); } if(logElement.elementTextTrim("AttamentFileMD5")!=null){ baseDeviceLog.setAttamentFileMD5(logElement.elementTextTrim("AttamentFileMD5")); } if(logElement.elementTextTrim("DevLogFilePath")!=null){ baseDeviceLog.setDevLogFilePath(logElement.elementTextTrim("DevLogFilePath")); } if(logElement.elementTextTrim("DevLogFileMD5")!=null){ baseDeviceLog.setDevLogFileMD5(logElement.elementTextTrim("DevLogFileMD5")); } if(logElement.elementTextTrim("OperationDateTime")!=null){ Date operationDateTime=new Date(); SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); operationDateTime=simpleDateFormat.parse(logElement.elementTextTrim("OperationDateTime")); baseDeviceLog.setOperationDateTime(operationDateTime); } baseDeviceLog.setUploadDateTime(DateUtils.getUtcDate()); Element itemElement=logElement.element("Item"); Iterator keyIterator=itemElement.elementIterator(); List<Key> itemTemp=new ArrayList<>(); while (keyIterator.hasNext()){ Key key=new Key(); Element keyElement=(Element) keyIterator.next(); if(keyElement.attribute("Name")!=null){ key.setName(keyElement.attributeValue("Name")); } if(keyElement.attribute("LowerThreshold")!=null){ key.setThredHoldMin(keyElement.attributeValue("LowerThreshold")); } if(keyElement.attribute("UpperThreshold")!=null){ key.setThredHoldMax(keyElement.attributeValue("UpperThreshold")); } if(keyElement.attribute("Unit")!=null){ key.setUnit(keyElement.attributeValue("Unit")); } List<String> valueList=new ArrayList<>(); Iterator valueIterator=keyElement.elementIterator(); while (valueIterator.hasNext()){ Element valueElement=(Element) valueIterator.next(); valueList.add(valueElement.getTextTrim()); } key.setValue(valueList); itemTemp.add(key); } baseDeviceLog.setItem(itemTemp); baseDeviceLogList.add(baseDeviceLog); } }catch (Exception ex){ logger.error("解析錯誤的xml字符串:"+xmlString); logger.error(ex.getMessage()); } return baseDeviceLogList; } }
com.mongodb.dao.UserDao : 操作MongoDB的Dao層

package com.mongodb.dao; import com.mongodb.domain.BaseDeviceLog; import java.util.List; /** * 操作MongoDB DAO層 */
public interface UserDao { void save(String collectionName, BaseDeviceLog baseDeviceLog); void inserList(String collectionName,List<BaseDeviceLog> baseDeviceLogs); List<BaseDeviceLog> find(); /** * 根據集合名稱查找集合中所有的文檔 * @param collectionName * @return
*/ List<BaseDeviceLog> findAllByCollectionName(String collectionName); List<BaseDeviceLog> getByCollectionNameAndCondition(String collectionName,BaseDeviceLogQuery baseDeviceLogQuery); }
com.mongodb.dao.UserDaoImpl : MongoDB的各種數據操作

package com.mongodb.dao; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.BulkOperations; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Component; import com.mongodb.domain.BaseDeviceLog; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; @Component public class UserDaoImpl implements UserDao{ @Autowired private MongoTemplate mongoTemplate; /** * 單個插入數據 * @param collectionName * @param baseDeviceLog */ @Override public void save(String collectionName, BaseDeviceLog baseDeviceLog) { if(!mongoTemplate.collectionExists(collectionName)){ mongoTemplate.createCollection(collectionName); } mongoTemplate.save(baseDeviceLog,collectionName); } /** * 批量插入數據 * @param collectionName * @param baseDeviceLogs */ @Override public void inserList(String collectionName,List<BaseDeviceLog> baseDeviceLogs){ if(!mongoTemplate.collectionExists(collectionName)){ mongoTemplate.createCollection(collectionName); } BulkOperations operations=mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED,collectionName); if(!CollectionUtils.isEmpty(baseDeviceLogs)){ for (BaseDeviceLog baseDeviceLog:baseDeviceLogs){ operations.insert(baseDeviceLog); } operations.execute(); } } /** * 模糊查詢數據 * @return
*/ @Override public List<BaseDeviceLog> find() { Query query=new Query(); Criteria criteria=new Criteria(); Pattern pattern=Pattern.compile("%600054%",Pattern.CASE_INSENSITIVE); query.addCriteria(criteria.and("SerialNumber").regex(pattern)); return mongoTemplate.find(query,BaseDeviceLog.class,"101022010050"); } /** * 根據集合名稱查找集合中所有的文檔 * @param collectionName * @return
*/ @Override public List<BaseDeviceLog> findAllByCollectionName(String collectionName) { return mongoTemplate.findAll(BaseDeviceLog.class,collectionName); } /** * 根據集合和條件查詢數據 * @param collectionName * @param baseDeviceLogQuery * @return
*/ @Override public List<BaseDeviceLog> getByCollectionNameAndCondition(String collectionName,BaseDeviceLogQuery baseDeviceLogQuery){ Query query=new Query(); Criteria criteria=new Criteria(); List<Criteria> criteriaList=new ArrayList<>(); Criteria[] criteria1Array={}; if(!StringUtils.isEmpty(baseDeviceLogQuery.getLogId())){ query.addCriteria(Criteria.where("LogId").is(baseDeviceLogQuery.getLogId())); } if(!StringUtils.isEmpty(baseDeviceLogQuery.getProductNumberEqual())){ query.addCriteria(Criteria.where("ProductNumber").is(baseDeviceLogQuery.getProductNumberEqual())); } if(!StringUtils.isEmpty(baseDeviceLogQuery.getSerialNumberEqual())){ query.addCriteria(Criteria.where("SerialNumber").is(baseDeviceLogQuery.getSerialNumberEqual())); } if(!StringUtils.isEmpty(baseDeviceLogQuery.getLogType())){ query.addCriteria(Criteria.where("LogType").is(baseDeviceLogQuery.getLogType())); } if(!StringUtils.isEmpty(baseDeviceLogQuery.getSystemId())){ query.addCriteria(Criteria.where("SystemId").is(baseDeviceLogQuery.getSystemId())); } if((!StringUtils.isEmpty(baseDeviceLogQuery.getMinUploadDatetime()))&& (!StringUtils.isEmpty(baseDeviceLogQuery.getMaxUploadDatetime()))){ query.addCriteria(Criteria.where("UploadDateTime").gte(baseDeviceLogQuery.getMinUploadDatetime()). lte(baseDeviceLogQuery.getMaxUploadDatetime())); }else{ if(baseDeviceLogQuery.getMinUploadDatetime()!=null){ query.addCriteria(Criteria.where("UploadDateTime").gte(baseDeviceLogQuery.getMinUploadDatetime())); } if(baseDeviceLogQuery.getMaxUploadDatetime()!=null){ query.addCriteria(Criteria.where("UploadDateTime").lte(baseDeviceLogQuery.getMaxUploadDatetime())); } } if((!StringUtils.isEmpty(baseDeviceLogQuery.getMinOperationDate()))&& (!StringUtils.isEmpty(baseDeviceLogQuery.getMaxOperationDate()))){ query.addCriteria(Criteria.where("OperationDateTime").gte(baseDeviceLogQuery.getMinOperationDate()) .lte(baseDeviceLogQuery.getMaxOperationDate())); }else{ if(baseDeviceLogQuery.getMinOperationDate()!=null){ query.addCriteria(Criteria.where("OperationDateTime").gte(baseDeviceLogQuery.getMinOperationDate())); } if(baseDeviceLogQuery.getMaxOperationDate()!=null){ query.addCriteria(Criteria.where("OperationDateTime").lte(baseDeviceLogQuery.getMaxOperationDate())); } } return mongoTemplate.find(query,BaseDeviceLog.class,collectionName); } }
com.mongodb.dao.BaseDeviceLogQuery :MongoDB查詢query的實體類

package com.mongodb.dao; import java.util.Date; /** * 查詢實體類 */
public class BaseDeviceLogQuery{ private String orderByKey; private String orderByValue; private String pageNumber; private String pageSize; private String operationDate; private Date minOperationDate; private Date maxOperationDate; private Date minUploadDatetime; private Date maxUploadDatetime; private String logId; private String systemId; private String logType; private String deviceTypeId; private String logSourceLike; private String logDescriptionLike; private String serviceCode; private String logCategory; private String logLevelEqual; private String[] logLevelIn; private String sourceLike; private String tempGuid; private String serialNumberEqual; private String productNumberEqual; private String calledSource; //getter,setter 方法省略
}
com.mongodb.dao.MongoConfig : 配置MongoDB連接,去除插入的_class

package com.mongodb.dao; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.convert.CustomConversions; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.DbRefResolver; import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper; import org.springframework.data.mongodb.core.convert.MappingMongoConverter; import org.springframework.data.mongodb.core.mapping.MongoMappingContext; /** * 配置MongoDB連接,去除插入的_class */ @Configuration public class MongoConfig { @Bean public MappingMongoConverter mappingMongoConverter(MongoDbFactory factory, MongoMappingContext context, BeanFactory beanFactory){ DbRefResolver dbRefResolver=new DefaultDbRefResolver(factory); MappingMongoConverter mappingMongoConverter=new MappingMongoConverter(dbRefResolver,context); try { mappingMongoConverter.setCustomConversions(beanFactory.getBean(CustomConversions.class)); }catch (NoSuchBeanDefinitionException ignore){ } mappingMongoConverter.setTypeMapper(new DefaultMongoTypeMapper(null)); return mappingMongoConverter; } }
com.mongodb.domain.BaseDeviceLog :與MongoDB中數據相映射的實體類

package com.mongodb.domain; import org.bson.types.ObjectId; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Field; import java.io.Serializable; import java.util.Date; import java.util.List; @Document public class BaseDeviceLog implements Serializable { @Id private ObjectId id; @Field("SerialNumber") private String serialNumber; @Field("ProductNumber") private String productNumber; @Field("LogType") private String logType; @Field("") private String sourceName; @Field("Description") private String description; @Field("LogPriority") private String logPriority; @Field("LogLevel") private String logLevel; @Field("OperationDateTime") private Date operationDateTime; @Field("Uid") private String uid; @Field("ServiceCode") private String serviceCode; @Field("Item") private List<Key> item; @Field("AttamentFilePath") private String attamentFilePath; @Field("AttamentFileMD5") private String attamentFileMD5; @Field("DevLogFilePath") private String devLogFilePath; @Field("DevLogFileMD5") private String devLogFileMD5; @Field("LogGuid") private String logGuid; @Field("CalledSource") private String calledSource; @Field("TempGuid") private String tempGuid; @Field("UploadDateTime") private Date uploadDateTime; @Field("SystemId") private String systemId; //getter,setter 方法省略
}
com.mongodb.domain.Key :與MongoDB數據相映射的實體類

package com.mongodb.domain; import java.io.Serializable; import java.util.List; public class Key implements Serializable { private String name; private String thredHoldMax; private String thredHoldMin; private String unit; private List<String> value; //getter,setter 方法省略
}
com.kafka.example.kafka.DecodeingKafka :實現kafka的反序列化接口

package com.kafka.example.kafka; import com.utils.common.BeanUtils; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; /** * 實現kafka的反序列化接口 */
public class DecodeingKafka implements Deserializer { @Override public void configure(Map map, boolean b) { } @Override public Object deserialize(String s, byte[] bytes) { return BeanUtils.byte2Obj(bytes); } @Override public void close() { } }
com.kafka.example.kafka.EncodeingKafka :實現kafka的序列化接口

package com.kafka.example.kafka; import com.utils.common.BeanUtils; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; public class EncodeingKafka implements Serializer<Object> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Object o) { return BeanUtils.bean2Byte(o); } @Override public void close() { } }
com.mongodb.example.mongodbdemo.BatchListener 加載mongoDB的配置,監聽消費kafka消息

package com.mongodb.example.mongodbdemo; import com.kafka.example.kafka.DecodeingKafka; import com.mongodb.dao.UserDaoImpl; import com.mongodb.domain.BaseDeviceLog; import com.xml.utils.XmlParse; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.stereotype.Component; import com.utils.common.*; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @ComponentScan(basePackages = "com.kafka.example.kafka,com.mongodb.dao,com.mongodb.domain") @Component public class BatchListener { @Autowired UserDaoImpl userDao; //加載連接mongoDB的配置
private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.3.13.213:9092,10.3.13.197:9092,10.3.13.194:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DecodeingKafka.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); container.setConcurrency(5); container.setBatchListener(true); return container; } //接收kafka消息
@KafkaListener(groupId = "test", topics = "log_topic", containerFactory = "batchContainerFactory") public void batchListener(List<ConsumerRecord<?, ?>> datas) { for (ConsumerRecord<?, ?> data : datas) { System.out.println("消息接收成功"); LogInfo logInfo=(LogInfo)data.value(); List<BaseDeviceLog> baseDeviceLogs=new ArrayList<>(); baseDeviceLogs=XmlParse.parseXmlFromString(logInfo.xmlString); String collectionName=""; if(!CollectionUtils.isEmpty(baseDeviceLogs)&& !StringUtils.isEmpty(baseDeviceLogs.get(0).getSystemId())){ String systemId=baseDeviceLogs.get(0).getSystemId(); collectionName=systemId; if(!StringUtils.isEmpty(baseDeviceLogs.get(0).getLogType())){ collectionName=collectionName+"_SL"; } }else{ System.out.println("發送的日志中沒有systemId"); continue; } userDao.inserList(collectionName,baseDeviceLogs); } } }
application.yml

server: port: 8081 spring: data: mongodb: uri: mongodb://[10.3.13.213:27017]/servicecenter,[10.3.13.197:27017]/servicecenter,[10.3.13.194:27017]/servicecenter