springcloud+kafka集群


上節說了kafka在linux環境下怎么搭建集群。這節寫一下怎么在springcloud中以stream流方式去做kafka集群對接。

1.yml配置

#spring  Cloud  kafka    -- streams --
  cloud:
     stream:
        kafka:
          binder:
            minPartitionCount: 3  # 分區數量,主要就是為了減輕單台服務器的壓力,擴大並發量
            brokers: 192.168.100.100:9092,192.168.100.101:9092,192.168.100.102:9092  # kafka服務地址和端口
            autoCreateTopics: true
            autoAddPartitions: true

2.消息發送

@RestController
@RequestMapping("/kafka")
@EnableBinding(value = {WarningStreams.class})
public class kafkaTest {

    @Autowired
    private MessageService messageService;

    /**
     * 測試消息發送,入參就是你的topic,進行發送的時候就算kafka中沒有該topic,他也會自動創建一個你傳入的topic
* 這里面的Msg是我封裝的一個消息對象,可以是隨意的一個消息對象,字符串也可以 * @param topic
*/ @RequestMapping("/sendMsg") public void sendMsg(String topic){ // 循環發送6次消息,分別發送在不同的分區 for (int i=0; i<=5; i++ ) { Msg msg = new Msg(); msg.setData(null); msg.setTaskId("1"); msg.setMsg("測試消息發送"); msg.setMsgId(System.currentTimeMillis() + MathUtil.getFiveRandom()); msg.setSuccess("true"); msg.setCode("200"); msg.setMsgType(100); String result = messageService.sendControl(msg, topic); System.out.println(result); } }
}

messageService類:
@Service
public class MessageService {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private BinderAwareChannelResolver resolver;

    /**
     * 發送預警消息到指定topic,這里的topic是由平台編碼+平台名稱組成
     * 若發現kafka中沒有該topic,它會自動創建一個由平台編碼+平台名稱組成的topic
     * @param warnings
     * @param topic
     * @return
     */
    public String sendWarning(final Msg warnings, String topic) {
        logger.info("Sending warnings {}", warnings);

        // 獲取預警的topic,然后發送預警消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(warnings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }

    /**
     * 發送布控消息到指定topic,這里的topic是由平台編碼+平台名稱組成
     * 若發現kafka中沒有該topic,它會自動創建一個由平台編碼+平台名稱組成的topic
     * @param msg
     * @param topic
     * @return
     */
    public String sendControl(final Msg msg, String topic) {
        logger.info("Sending controlMsg {}", JSON.toJSONString(msg));
        // 獲取布控的topic,然后發送布控消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    }
}
 
         

發送完畢后會在服務器中的topic下看到你傳入的那個topic,並且會有三個分區,每個分區分別對應三台服務器並且每台服務器中會有兩條消息,如下圖:

 

3.消息接收

@RestController
@RequestMapping("/kafka")
@EnableBinding(value = {WarningStreams.class})
public class kafkaTest {

    /**
     * 測試消息接收,接收對象用Object,否則收不到
     * @param playLoad
     */
    @StreamListener(WarningStreams.INPUT)
    public void receive(Object playLoad) {
        System.out.println("消息消費..result=="+ JSON.toJSONString(playLoad));
    }
}
當消息被消費后,分區中的數據釋放被清空,但是會保存在硬盤的log日志中。也就是在server.properties中你配置的log目錄

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM