一、阻塞隊列
-
BlockingQueue
-
- 解決線程通信的問題。
- 阻塞方法:put、take。
-
生產者消費者模式
-
- 生產者:產生數據的線程。
- 消費者:使用數據的線程。
-
實現類
-
- ArrayBlockingQueue
- LinkedBlockingQueue
- PriorityBlockingQueue、SynchronousQueue、DelayQueue等。
面試常問:寫一個生產者消費者實現
public class Test {
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingDeque<>(10);
Producer p = new Producer(queue);
Consumer c = new Consumer(queue);
new Thread(p,"producer").start();
new Thread(c,"consumer").start();
}
}
class Consumer implements Runnable {
private BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while(true){
Thread.sleep(20);
System.out.println("消費者消費了:" + queue.take());
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Producer implements Runnable{
private BlockingQueue<String> queue;
public Producer(BlockingQueue<String> queue){
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
String tmp = "a product " + i + " from:" + Thread.currentThread().getName();
System.out.println("生產者生產了:" + tmp);
queue.put(tmp);
Thread.sleep(20);
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、Kafka入門
-
Kafka簡介
-
- Kafka是一個分布式的流媒體平台。
- 應用:消息系統、日志收集、用戶行為追蹤、流式處理。
-
Kafka特點
-
- 高吞吐量、消息持久化、高可靠性、高擴展性。
-
Kafka術語
-
- Broker、Zookeeper
- Topic、Partition、Offset
- Leader Replica 、Follower Replica
Kafka術語解釋
- Broker:Kafka的服務器
- Zookeeper:管理集群
- Topic:點對點模式中每個消費者拿到的消息都不同,發布訂閱模式中消費者可能拿到同一份消息。Kafka采用發布訂閱模式,生產者把消息發布到的空間(位置)就叫Topic
- Partition:是對Topic位置的分區,如下圖:
- Offset:就是消息在分區中的索引
- Leader Replica:主副本,可以處理請求
- Follower Replica:從副本,只是用作備份
Kafka相關鏈接:官網
windows下更改配置
1.config目錄下zookeeper.properties修改
2.server.properties
window下使用Kafka
1.啟動Zookeeper
2.啟動Kafka
啟動完成后就會出現我們設置的文件夾
3.創建主題
查看所有主題判斷是否創建成功
4.往主題上發送消息
發兩條消息
5.消費者接收消息
三、Spring整合Kafka
-
引入依賴
-
- spring-kafka
-
配置Kafka
-
- 配置server、consumer
-
訪問Kafka
-
- 生產者
kafkaTemplate.send(topic, data); - 消費者
@KafkaListener(topics = {“test”})
public void handleMessage(ConsumerRecord record) {}
- 生產者
導包
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
application.properties中配置
測試一波
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTest {
@Autowired
KafkaProducer kafkaProducer;
@Autowired
KafkaConsumer kafkaConsumer;
@Test
public void testKafka(){
kafkaProducer.sendMessage("test","hello world");
kafkaProducer.sendMessage("test","I love java");
try {
Thread.sleep(1000*10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic,String content){
kafkaTemplate.send(topic,content);
}
}
@Component
class KafkaConsumer{
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record){
System.out.println(record.value());
}
}
生產者主動發消息,消費者被動接收消息
四、發送系統通知
-
觸發事件
-
- 評論后,發布通知
- 點贊后,發布通知
- 關注后,發布通知
-
處理事件
-
- 封裝事件對象
- 開發事件的生產者
- 開發事件的消費者
封裝事件對象
public class Event {
//張三給李四點贊---userId是張三,entityUserId是李四
private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String,Object> data = new HashMap<>();
public String getTopic() {
return topic;
}
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
public int getUserId() {
return userId;
}
public Event setUserId(int userId) {
this.userId = userId;
return this;
}
public int getEntityType() {
return entityType;
}
public Event setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public Event setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityUserId() {
return entityUserId;
}
public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;
return this;
}
public Map<String, Object> getData() {
return data;
}
public Event setData(String key,String object) {
this.data.put(key,object);
return this;
}
}
- 注意set方法的修改是為了可以類似sb.append(" ").append(“1”);的操作
- 注意setData方法的修改
開發事件的生產者
新建event包
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//處理事件,本質就是發送消息
public void fireEvent(Event event){
//將事件發送到指定的主題
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
}
}
開發事件的消費者
@Component
public class EventConsumer implements CommunityContant {
//記日志
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
//消息最終是要往message表中插入數據的
@Autowired
private MessageService messageService;
@KafkaListener(topics = {TOPIC_LIKE,TOPIC_COMMENT,TOPIC_FOLLOW})
public void handleCommentMessage(ConsumerRecord record){
if(record==null||record.value()==null){
logger.error("消息的內容為空");
return;
}
Event event = JSONObject.parseObject(record.value().toString(),Event.class);
if(event==null){
logger.error("消息格式錯誤");
return;
}
//發送站內通知,主要是構造message對象
Message message = new Message();
//User表中id為1代表系統用戶
message.setFromId(SYSTEM_ID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(new Date());
//為啥不直接把Event轉成json存在content里邊?
Map<String,Object> content = new HashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
處理評論事件–CommentController
增加如下代碼
處理點贊事件–LikeController
增加如下代碼
因為重構了方法我們處理下js傳參
discuss-detail頁面
處理關注事件–FollowController
follow方法
一些小bug
1.Kafka報錯: Topic(s) [xxx] is/are not present and missingTopicsFatal is true
- 報錯原因: 消費監聽接口監聽的主題不存在時,默認會報錯
- 解決方法: 配置文件中將listener的屬性missingTopicsFatal設置為false
spring.kafka.listener.missing-topics-fatal=false
• 1
2.以前寫的AOP記錄日志報錯
因為消費者里也調了service方法所以上圖紅框有可能取不到request
解決方法
測試一波
發現message表中已經有數據了
五、顯示系統通知
-
通知列表
-
- 顯示評論、點贊、關注三種類型的通知
-
通知詳情
-
- 分頁顯示某一類主題所包含的通知
-
未讀消息
-
- 在頁面頭部顯示所有的未讀消息數量
列表頁寫MessageMapper
因為消息存到了message表里
列表頁service層
MessageService類中
列表頁MessageController
@RequestMapping(path="/notice/list",method = RequestMethod.GET)
public String getNoticeList(Model model){
User user = hostHolder.getUser();
//查詢評論類通知
Message message = messageService.findLatestNotice(user.getId(),TOPIC_COMMENT);
Map<String,Object> messageVo = new HashMap<>();
if(message!=null){
messageVo.put("message",message);
String content = message.getContent();
//{"entityType":1,"entityId":275,"postId":275,"userId":111}
content = HtmlUtils.htmlUnescape(content); //這步可以去除上方轉義字符
HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
//誰發給我?
messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
messageVo.put("entityType",data.get("entityType"));
messageVo.put("entityId",data.get("entityId"));
messageVo.put("postId",data.get("postId"));
int count = messageService.findNoticeCount(user.getId(),TOPIC_COMMENT);
messageVo.put("count",count);
int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_COMMENT);
messageVo.put("unread",unread);
}
model.addAttribute("commentNotice",messageVo);
//查詢點贊類通知
message = messageService.findLatestNotice(user.getId(),TOPIC_LIKE);
messageVo = new HashMap<>();
if(message!=null){
messageVo.put("message",message);
String content = message.getContent();
//{"entityType":1,"entityId":275,"postId":275,"userId":111}
content = HtmlUtils.htmlUnescape(content); //這步可以去除上方轉義字符
HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
//誰發給我?
messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
messageVo.put("entityType",data.get("entityType"));
messageVo.put("entityId",data.get("entityId"));
messageVo.put("postId",data.get("postId"));
int count = messageService.findNoticeCount(user.getId(),TOPIC_LIKE);
messageVo.put("count",count);
int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_LIKE);
messageVo.put("unread",unread);
}
model.addAttribute("likeNotice",messageVo);
//查詢關注類通知
message = messageService.findLatestNotice(user.getId(),TOPIC_FOLLOW);
messageVo = new HashMap<>();
if(message!=null){
messageVo.put("message",message);
String content = message.getContent();
//{"entityType":1,"entityId":275,"postId":275,"userId":111}
content = HtmlUtils.htmlUnescape(content); //這步可以去除上方轉義字符
HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
//誰關注了我?
messageVo.put("user",userService.findUserById((Integer)data.get("userId")));
messageVo.put("entityType",data.get("entityType"));
messageVo.put("entityId",data.get("entityId"));
int count = messageService.findNoticeCount(user.getId(),TOPIC_FOLLOW);
messageVo.put("count",count);
int unread = messageService.findNoticeUnreadCount(user.getId(),TOPIC_FOLLOW);
messageVo.put("unread",unread);
}
model.addAttribute("followNotice",messageVo);
//查詢未讀消息數量
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(),null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(),null);
model.addAttribute("letterUnreadCount",letterUnreadCount);
model.addAttribute("noticeUnreadCount",noticeUnreadCount);
return "/site/notice";
}
}
還有一點修改時在letter頁面上也得顯示系統通知的數量所以在/message/list路徑下增加如下代碼,然后處理下letter.html的頁面
列表頁notice的頁面
先修改成Thymeleaf,其他填數據
評論的模板填充,點贊和關注同理
測試一波
詳情頁MessageMapper
詳情頁MessageService
詳情頁MessageController
@RequestMapping(path = "/notice/detail/{topic}",method = RequestMethod.GET)
public String getNoticeDetail(@PathVariable("topic")String topic,Page page,Model model){
User user = hostHolder.getUser();
//處理分頁
page.setLimit(5);
page.setRows(messageService.findNoticeCount(user.getId(),topic));
page.setPath("/notice/detail/"+topic);
List<Message> noticeList = messageService.findNotices(user.getId(),topic,page.getOffset(),page.getLimit());
List<Map<String,Object>> noticeVoList = new ArrayList<>();
if(noticeList!=null){
for(Message notice:noticeList){
Map<String,Object> map = new HashMap<>();
//通知
map.put("notice",notice);
//內容
String content = notice.getContent();
content = HtmlUtils.htmlUnescape(content);
HashMap<String,Object> data = JSONObject.parseObject(content, HashMap.class);
map.put("user",userService.findUserById((Integer)data.get("userId")));
map.put("entityType",data.get("entityType"));
map.put("entityId",data.get("entityId"));
map.put("postId",data.get("postId"));
map.put("fromUser",userService.findUserById(notice.getFromId())); //系統名
noticeVoList.add(map);
}
}
model.addAttribute("notices",noticeVoList);
//設置已讀
List<Integer> ids = getLetterIds(noticeList);
if(!ids.isEmpty()){
messageService.readMessage(ids);
}
return "/site/notice-detail";
}
詳情頁notice-detail頁面
頭部總消息數通知
每個請求處理完都要查看所以用攔截器處理
寫一個攔截器
@Component
public class MessageInterator implements HandlerInterceptor {
@Autowired
private HostHolder hostHolder;
@Autowired
private MessageService messageService;
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
User user = hostHolder.getUser();
//user是否登錄,是否模板可以攜帶
if(user!=null&&modelAndView!=null){
int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
int unreadCount = letterUnreadCount+noticeUnreadCount;
modelAndView.addObject("unreadCount",unreadCount);
}
}
}
加入config中
index頁面頭部處理一下