仿牛客網第五章


一、阻塞隊列

  • BlockingQueue

    • 解決線程通信的問題。
    • 阻塞方法:put、take。
  • 生產者消費者模式

    • 生產者:產生數據的線程。
    • 消費者:使用數據的線程。
  • 實現類

    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

img

面試常問:寫一個生產者消費者實現

public class Test {
    public static void main(String[] args) {
        BlockingQueue<String> queue = new LinkedBlockingDeque<>(10);
        Producer p = new Producer(queue);
        Consumer c = new Consumer(queue);
        new Thread(p,"producer").start();
        new Thread(c,"consumer").start();
    }
}
class Consumer implements Runnable {
    private BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while(true){
                Thread.sleep(20);
                System.out.println("消費者消費了:" + queue.take());
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}
class Producer implements Runnable{
    private BlockingQueue<String> queue;
    public Producer(BlockingQueue<String> queue){
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                String tmp = "a product " + i + " from:" + Thread.currentThread().getName();
                System.out.println("生產者生產了:" + tmp);
                queue.put(tmp);
                Thread.sleep(20);
            }
        }catch (InterruptedException e) {
                e.printStackTrace();
        }
    }
}

二、Kafka入門

  • Kafka簡介

    • Kafka是一個分布式的流媒體平台。
    • 應用:消息系統、日志收集、用戶行為追蹤、流式處理。
  • Kafka特點

    • 高吞吐量、消息持久化、高可靠性、高擴展性。
  • Kafka術語

    • Broker、Zookeeper
    • Topic、Partition、Offset
    • Leader Replica 、Follower Replica

Kafka術語解釋

  • Broker:Kafka的服務器
  • Zookeeper:管理集群
  • Topic:點對點模式中每個消費者拿到的消息都不同,發布訂閱模式中消費者可能拿到同一份消息。Kafka采用發布訂閱模式,生產者把消息發布到的空間(位置)就叫Topic
  • Partition:是對Topic位置的分區,如下圖:
    img
  • Offset:就是消息在分區中的索引
    img
  • Leader Replica:主副本,可以處理請求
  • Follower Replica:從副本,只是用作備份

Kafka相關鏈接:官網

windows下更改配置

1.config目錄下zookeeper.properties修改

img2.server.properties

img

window下使用Kafka

1.啟動Zookeeper

img

2.啟動Kafka

img

啟動完成后就會出現我們設置的文件夾

img

3.創建主題

img查看所有主題判斷是否創建成功

img

4.往主題上發送消息

img發兩條消息

img

5.消費者接收消息

img

三、Spring整合Kafka

  • 引入依賴

    • spring-kafka
  • 配置Kafka

    • 配置server、consumer
  • 訪問Kafka

    • 生產者
      kafkaTemplate.send(topic, data);
    • 消費者
      @KafkaListener(topics = {“test”})
      public void handleMessage(ConsumerRecord record) {}

img

導包

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.0.RELEASE</version>
</dependency>

application.properties中配置

img

測試一波

@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
    @Autowired
    KafkaProducer kafkaProducer;
    @Autowired
    KafkaConsumer kafkaConsumer;
    @Test
    public void testKafka(){
        kafkaProducer.sendMessage("test","hello world");
        kafkaProducer.sendMessage("test","I love java");
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
@Component
class KafkaProducer{
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public void sendMessage(String topic,String content){
        kafkaTemplate.send(topic,content);
    }
}
@Component
class KafkaConsumer{
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
    }
}

生產者主動發消息,消費者被動接收消息

四、發送系統通知

  • 觸發事件

    • 評論后,發布通知
    • 點贊后,發布通知
    • 關注后,發布通知
  • 處理事件

    • 封裝事件對象
    • 開發事件的生產者
    • 開發事件的消費者

img

封裝事件對象

public class Event {
//張三給李四點贊---userId是張三,entityUserId是李四
    private String topic;
    private int userId;
    private int entityType;
    private int entityId;
    private int entityUserId;
    private Map<String,Object> data = new HashMap<>();
    
    public String getTopic() {
        return topic;
    }
    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }
    public int getUserId() {
        return userId;
    }
    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }
    public int getEntityType() {
        return entityType;
    }
    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }
    public int getEntityId() {
        return entityId;
    }
    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }
    public int getEntityUserId() {
        return entityUserId;
    }
    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }
    public Map<String, Object> getData() {
        return data;
    }
    public Event setData(String key,String object) {
        this.data.put(key,object);
        return this;
    }
}
  • 注意set方法的修改是為了可以類似sb.append(" ").append(“1”);的操作
  • 注意setData方法的修改

開發事件的生產者

新建event包

@Component
public class EventProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    //處理事件,本質就是發送消息
    public  void fireEvent(Event event){
        //將事件發送到指定的主題
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}

開發事件的消費者

@Component
public class EventConsumer implements CommunityContant {
    //記日志
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    //消息最終是要往message表中插入數據的
    @Autowired
    private MessageService messageService;
    @KafkaListener(topics = {TOPIC_LIKE,TOPIC_COMMENT,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record==null||record.value()==null){
            logger.error("消息的內容為空");
            return;
        }
        Event event = JSONObject.parseObject(record.value().toString(),Event.class);
        if(event==null){
            logger.error("消息格式錯誤");
            return;
        }
        //發送站內通知,主要是構造message對象
        Message message = new Message();
        //User表中id為1代表系統用戶
        message.setFromId(SYSTEM_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //為啥不直接把Event轉成json存在content里邊?
        Map<String,Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }
}

處理評論事件–CommentController

增加如下代碼

img

處理點贊事件–LikeController

增加如下代碼

img

因為重構了方法我們處理下js傳參

discuss-detail頁面

imgimg

處理關注事件–FollowController

follow方法

img

一些小bug

1.Kafka報錯: Topic(s) [xxx] is/are not present and missingTopicsFatal is true

  • 報錯原因: 消費監聽接口監聽的主題不存在時,默認會報錯
  • 解決方法: 配置文件中將listener的屬性missingTopicsFatal設置為false
spring.kafka.listener.missing-topics-fatal=false
• 1

2.以前寫的AOP記錄日志報錯

imgimg

因為消費者里也調了service方法所以上圖紅框有可能取不到request

解決方法

img

測試一波

發現message表中已經有數據了

img

五、顯示系統通知

  • 通知列表

    • 顯示評論、點贊、關注三種類型的通知
  • 通知詳情

    • 分頁顯示某一類主題所包含的通知
  • 未讀消息

    • 在頁面頭部顯示所有的未讀消息數量

img

列表頁寫MessageMapper

因為消息存到了message表里

img

imgimg

列表頁service層

MessageService類中

img

列表頁MessageController

@RequestMapping(path="/notice/list",method = RequestMethod.GET)
    public String getNoticeList(Model model){
        User user = hostHolder.getUser();
        //查詢評論類通知
        Message message = messageService.findLatestNotice(user.getId(),TOPIC_COMMENT);
        Map<String,Object> messageVo = new HashMap<>();
        if(message!=null){
            messageVo.put("message",message);
            String content = message.getContent();
            //{&quot;entityType&quot;:1,&quot;entityId&quot;:275,&quot;postId&quot;:275,&quot;userId&quot;:111}
            content = HtmlUtils.htmlUnescape(content);  //這步可以去除上方轉義字符
            HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
            //誰發給我?
            messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
            messageVo.put("entityType",data.get("entityType"));
            messageVo.put("entityId",data.get("entityId"));
            messageVo.put("postId",data.get("postId"));
            int count = messageService.findNoticeCount(user.getId(),TOPIC_COMMENT);
            messageVo.put("count",count);
            int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_COMMENT);
            messageVo.put("unread",unread);
        }
        model.addAttribute("commentNotice",messageVo);
        //查詢點贊類通知
        message = messageService.findLatestNotice(user.getId(),TOPIC_LIKE);
        messageVo = new HashMap<>();
        if(message!=null){
            messageVo.put("message",message);
            String content = message.getContent();
            //{&quot;entityType&quot;:1,&quot;entityId&quot;:275,&quot;postId&quot;:275,&quot;userId&quot;:111}
            content = HtmlUtils.htmlUnescape(content);  //這步可以去除上方轉義字符
            HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
            //誰發給我?
            messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
            messageVo.put("entityType",data.get("entityType"));
            messageVo.put("entityId",data.get("entityId"));
            messageVo.put("postId",data.get("postId"));
            int count = messageService.findNoticeCount(user.getId(),TOPIC_LIKE);
            messageVo.put("count",count);
            int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_LIKE);
            messageVo.put("unread",unread);
        }
        model.addAttribute("likeNotice",messageVo);
        //查詢關注類通知
        message = messageService.findLatestNotice(user.getId(),TOPIC_FOLLOW);
        messageVo = new HashMap<>();
        if(message!=null){
            messageVo.put("message",message);
            String content = message.getContent();
            //{&quot;entityType&quot;:1,&quot;entityId&quot;:275,&quot;postId&quot;:275,&quot;userId&quot;:111}
            content = HtmlUtils.htmlUnescape(content);  //這步可以去除上方轉義字符
            HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
            //誰關注了我?
            messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
            messageVo.put("entityType",data.get("entityType"));
            messageVo.put("entityId",data.get("entityId"));
            int count = messageService.findNoticeCount(user.getId(),TOPIC_FOLLOW);
            messageVo.put("count",count);
            int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_FOLLOW);
            messageVo.put("unread",unread);
        }
        model.addAttribute("followNotice",messageVo);
        //查詢未讀消息數量
        int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(),null);
        int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(),null);
        model.addAttribute("letterUnreadCount",letterUnreadCount);
        model.addAttribute("noticeUnreadCount",noticeUnreadCount);
        return  "/site/notice";
    }
}

還有一點修改時在letter頁面上也得顯示系統通知的數量所以在/message/list路徑下增加如下代碼,然后處理下letter.html的頁面

img

列表頁notice的頁面

先修改成Thymeleaf,其他填數據

img評論的模板填充,點贊和關注同理

img

測試一波

img

img

詳情頁MessageMapper

img

詳情頁MessageService

img

詳情頁MessageController

@RequestMapping(path = "/notice/detail/{topic}",method = RequestMethod.GET)
    public String getNoticeDetail(@PathVariable("topic")String topic,Page page,Model model){
        User user = hostHolder.getUser();
        //處理分頁
        page.setLimit(5);
        page.setRows(messageService.findNoticeCount(user.getId(),topic));
        page.setPath("/notice/detail/"+topic);
        List<Message> noticeList = messageService.findNotices(user.getId(),topic,page.getOffset(),page.getLimit());
        List<Map<String,Object>> noticeVoList = new ArrayList<>();
        if(noticeList!=null){
            for(Message notice:noticeList){
                Map<String,Object> map = new HashMap<>();
                //通知
                map.put("notice",notice);
                //內容
                String content = notice.getContent();
                content = HtmlUtils.htmlUnescape(content);
                HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
                map.put("user",userService.findUserById((Integer)data.get("userId")));
                map.put("entityType",data.get("entityType"));
                map.put("entityId",data.get("entityId"));
                map.put("postId",data.get("postId"));
                map.put("fromUser",userService.findUserById(notice.getFromId())); //系統名
                noticeVoList.add(map);
            }
        }
        model.addAttribute("notices",noticeVoList);
        //設置已讀
        List<Integer> ids = getLetterIds(noticeList);
        if(!ids.isEmpty()){
            messageService.readMessage(ids);
        }
        return "/site/notice-detail";
    }

詳情頁notice-detail頁面

imgimg

頭部總消息數通知

img

每個請求處理完都要查看所以用攔截器處理

寫一個攔截器

@Component
public class MessageInterator implements HandlerInterceptor {
    @Autowired
    private HostHolder hostHolder;
    @Autowired
    private MessageService messageService;
    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        User user = hostHolder.getUser();
        //user是否登錄,是否模板可以攜帶
        if(user!=null&&modelAndView!=null){
            int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
            int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
            int unreadCount = letterUnreadCount+noticeUnreadCount;
            modelAndView.addObject("unreadCount",unreadCount);
        }
    }
}

加入config中

imgindex頁面頭部處理一下

img


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM