SpringBoot多線程生產者消費者模型應用——排隊叫號實驗模擬(一)


1. 需求說明

目前的需求是在web端做一個排隊叫號系統的過程模擬,目前實現了前半部分,使用到了生產者消費者模型,雖然比較簡單,但還是記錄一下。
image0

2. 目前實現進度

完成了Thread A放客戶到緩沖區,Thread B從緩沖區取客戶並放入redis隊列的過程。
實現效果圖:
image1
image2

3.關鍵代碼

3.1 緩沖區實現

因為客戶有時間標簽,每個人的標簽基本上不一樣,所以緩沖區考慮並發的優先級隊列。

@Component
public class PatientTimeSeqBuffer {

	private static final PriorityBlockingQueue<PatientDto> BUFFER_QUEUE = new PriorityBlockingQueue<>(200);

	// 放入緩存區
	public void putPatientDto2Buffer(PatientDto patient) {
		try {
			BUFFER_QUEUE.put(patient);
		}catch (Exception e) {
			e.printStackTrace();
		}
	}

	// 取出優先級最高的元素
	public PatientDto getPatientDto() {
		try {
			return BUFFER_QUEUE.take();
		}catch (InterruptedException e) {
			e.printStackTrace();
		}
		return null;
	}
}

3.2 生產者Async流程

        @Autowired
	private PatientInfoService patientInfoService;  // 查客戶信息

	@Autowired
	private PatientTimeSeqBuffer buffer;  // 緩沖池

        @Async("arriveTaskExecutor")
	@Override
	public void timeSleepPut2Buffer(long startTime, List<PatientDto> patientDtoList) {
		while (!patientDtoList.isEmpty()) { // 給的總列表不空,就一直准備出隊
			PatientDto dto = patientDtoList.remove(0); // 移除隊首
			Long addBufferTime = dto.getCurrentTime(); // 以下皆為業務代碼
			if (addBufferTime - startTime > 0) {
				try {
					Thread.sleep(addBufferTime - startTime);  // 這里寫的不好,但沒想到別的解決方案,這里線程池最多只有一個線程
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			buffer.putPatientDto2Buffer(dto);
			log.info("{}進入了緩沖區at{}", dto, new Date());
		}
	}

3.3 消費者Async流程

        @Autowired
	private PatientInfoService patientInfoService;

	@Autowired
	private WeightedQueuing algorithmService;

	@Autowired
	private final QueueUtil queueUtil;
	public AsyncServiceImpl(QueueUtil queueUtil) {
		this.queueUtil = queueUtil;
	}
	@Async("computeTaskExecutor")
	@Override
	public void computeNextRoom() {
		// 依靠別的線程來打斷自己
		while (true) {
			// 先從緩存區中取客戶,若無客戶則阻塞自己
			PatientDto patientDto = buffer.getPatientDto();
			// 計算
			PatientInfo patientInfo = patientInfoService.getById(patientDto.getPatientId());
			RoomInfo result = algorithmService.computeNextRoom(patientInfo);
			// 准備插入數據
			PatientVo vo = new PatientVo(patientInfo);
			synchronized (queueUtil) {
				queueUtil.addPatient2Queue(result.getRoomCode(), vo);
				log.info("{} 去 {}, 在{}時", patientInfo.getPatientName(), result.getRoomName(), new Date());
			}
		}
	}

3.4 PatientDto及測試代碼

/*
* PatientDto.java
*/
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class PatientDto implements Comparable<PatientDto>{
	private Long patientId;
	private Integer queueNum;
	private String patientName;
	private String patientGender;
	private Long currentTime;

	public PatientDto(PatientVo vo, long time) {
		this.patientId = vo.getPatientId();
		this.queueNum = vo.getQueueNum();
		this.patientName = vo.getPatientName();
		this.patientGender = vo.getPatientGender();
		this.currentTime = time;
	}

	public PatientDto(PatientInfo info, long time) {
		this.patientId = info.getPatientId();
		this.queueNum = info.getQueueNum();
		this.patientName = info.getPatientName();
		this.patientGender = info.getPatientSex();
		this.currentTime = time;
	}

	@Override
	public String toString() {
		return "PatientDto{" +
				patientId +
				", " + patientName +
				", " + currentTime +
				'}';
	}

	/**
	 * 令時間早的優先級高
	 * @param o
	 * @return
	 */
	@Override
	public int compareTo(PatientDto o) {
		int compareTo = this.currentTime.compareTo(o.currentTime);
		compareTo = -compareTo;
		return compareTo;
	}
}
/*
* 測試Controller方法
*/
        @GetMapping("/simu/start")
	public Result ScheduleService() {
		QueryWrapper qw = new QueryWrapper();  // 篩選時間字段
		qw.eq("appointment_time", "2022-02-26 07:00:00");
		List<PatientInfo> patientInfoList = patientInfoService.list(qw);
		// 制作隨機間隔時間
		// TODO:隨機間隔時間可以更貼近現實
		Random random = new Random();
		LongStream longs = random.longs(patientInfoList.size(), 1000L, 3000L);
		long[] randomTimes = longs.toArray();
		List<PatientDto> patientDtoList = new LinkedList<>();
		long startTime = System.currentTimeMillis();  // 實驗開始時間記錄
		log.info("實驗開時間是: {}", startTime);
		// 裝載dto對象,時間為當前時間加上隨機時間
		for (int i = 0; i < patientInfoList.size(); i++) {
			patientDtoList.add(new PatientDto(patientInfoList.get(i), startTime + randomTimes[i]));
		}

		// 開始異步執行加入緩沖區操作
		asyncService.timeSleepPut2Buffer(startTime, patientDtoList);

		// 開始異步執行計算並分配, 希望分配的時候同一時刻僅有一個在分配
		asyncService.computeNextRoom();

		// 開啟websocket發送定時任務
		asyncService.websocketScheduling();

		return Result.success();
	}

線程池配置及其他業務解釋略.

4. 結語

自己今天終於勉強客服拖延症,嘗試做五分鍾,然后真正投入狀態,另外不要給自己太大壓力,該睡覺睡覺,該娛樂娛樂,把控好學習、放松和健身的度。目前自己技術水平還是很菜,只能做一個拼合怪。基礎方面,數據結構算法還沒怎么刷題,操作系統網絡幾乎沒看,java八股看了一點多線程也看不下去了。希望這周開始是個充實而平衡的一周。晚安。

5. 參考文章

1.深入理解 JUC:PriorityBlockingQueue
2.Java並發編程之PriorityBlockingQueue阻塞隊列詳解
3. csdn——PriorityBlockingQueue使用小結
4.生產者消費者模式——BlockingQueue
5.SpringBoot定時任務+自定義線程池
6.spring boot:使用多個線程池實現實現任務的線程池隔離(spring boot 2.3.2)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM