redis隊列及多線程應用


  由於xxx平台上自己的博客已經很久沒更新了,一直以來都是用的印象筆記來做工作中知識的積累存根,不知不覺印象筆記里已經有了四、五百遍文章。為了從新開始能與廣大攻城獅共同提高技術能力與水平,隨決心另起爐灶在新的博客與大家分享

  經過一段時間項目的沉淀之后,對實際應用中的多線程開發及隊列使用產生了深厚的興趣,也將<<java並發編程實戰>>仔細的閱讀了兩三遍,也看了很多並發編程的實踐項目,也有了深刻的理解與在實踐中合理應用隊列、多線程開發的應用場景

  1、真實應用場景描述:

   由於一段時間以來要針對公司整個電商平台包括官網、移動端所有的交易數據進行統計,統計指標包括:pv、uv、實付金額、轉化率、毛利率等等,按照各種不同的維度來統計計算出當前交易系統的各個指標的數據,但要求該項目是獨立的,沒有任務其它資源的協助及接品提供。經過一番xxxx思考討論之后。業務上決定用以下解決方案:

    A: 用一個定時服務每隔10秒去別的系統數據庫抓取上一次查詢時間以來新確認的訂單(這種訂單表示已經支付完在或者客戶已經審核確認了),然后將這些訂單的唯一編號放入redis隊列。

    B: 由於用到了隊列,根據經驗自然而然的想到了  啟動單獨的線程去redis隊列中不斷獲取要統計處理的訂單編號,然后將獲取到的訂單編號放入線程池中進行訂單的統計任務處理。

 

    開發實現:

    FetchConfirmOrdersFromErpJob.java

 1 /**
 2      * 1、從redis中獲取上次查詢的時間戳
 3      * 2、將當前時間戳放入到redis中,以便 下次按這個時間查詢
 4      * 3、去erp訂單表查詢confirm_time>=上次查詢的時間的訂單,放入隊列中
 5      */
 6     @Scheduled(cron = "0/30 * * * * ?")
 7     public void start(){
 8         logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date());
 9         StopWatch watch=new StopWatch();
10         watch.start();
11         //上次查詢的時間
12         String preQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);
13         
14         Date now=new Date();
15         if(StringUtils.isBlank(preQueryTimeStr)){
16             preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查詢之前一個小時的訂單
17 //            preQueryTimeStr="2015-05-07 10:00:00";//本地測試的時候使用
18         }
19         //設置當前時間為上次查詢的時間
20         this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));
21         
22         List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);
23         if(confirmOrderIds==null){
24             logger.info("query confirmOrderIds is null,without order data need dealth..........");
25             return;
26         }
27         for (Map<String, Object> map : confirmOrderIds) {
         //將訂單編號放入隊列中
28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString()); 29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString()); 30 } 31 32 watch.stop(); 33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size()); 34 }

 

    OrderCalculate.java    隊列獲取訂單線程

 1 public class OrderCalculate {
 2 
 3     private static final Log logger = LogFactory.getLog(OrderCalculate.class);
 4     
 5     @Autowired
 6     private static WriteRedisService writeRedisService;
 7     
 8     private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4
 9             ,new TjThreadFactory("CalculateAmount"));
10     static{
11         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
12             @Override
13             public void run() {
14                 QueuePop.stop();
15                 threadPool.shutdown();
16             }
17         }));
18     }
19     
20     public void init(){
21         if(writeRedisService==null){
22             writeRedisService=SpringContext.getBean(WriteRedisService.class);
23         }
24         new Thread(new QueuePop(),"OrderIdQueuePop").start();//由於是用redis做的隊列,所以只要使用一個線程從隊列里拿就ok
25     }
26     
27     static class QueuePop implements Runnable{
28 
29         volatile static boolean stop=false;
30         
31         @Override
32         public void run() {
33             while(!stop){
34                 //不斷循環從隊列里取出訂單id
35                 String orderId=null;
36                 try {
37                     orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);
38                     if(orderId!=null){
39                         logger.info("pop orderId:"+orderId);
                //將獲取的訂單編號交給訂單統計任務處理線程處理
40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date())); 41 } 42 } catch (Exception e1) { 43 logger.error("",e1); 44 } 45 //根據上線后的業務反饋來確定是否改成wait/notify策略來及時處理確認的訂單 46 try { 47 Thread.sleep(10); 48 } catch (InterruptedException e) { 49 logger.error("",e); 50 // Thread.currentThread().interrupt(); 51 //stop=true;//線程被打算繼續執行,不應該被關閉,保證該線程永遠不會死掉 52 } 53 } 54 } 55 56 public static void stop(){ 57 stop=true; 58 } 59 60 } 61 62 }

 

      CalculateAmoiunt.java   訂單任務處理

  1 public class CalculateAmount implements Runnable {
  2     private static final Log logger = LogFactory.getLog(CalculateAmount.class);
  3     private int orderId;
  4     private Date now;//確認時間  這個時間有一定的延遲,基本可以忽略,如果沒什么用
  5     private OrderService orderServices;
  6     private OrdHaveProductService ordHaveProductService;
  7     private OrdPayByCashbackService ordPayByCashbackService;
  8     private OrdPayByCouponService ordPayByCouponService;
  9     private OrdPayByGiftCardService ordPayByGiftCardService;
 10     private StatisticsService statisticsService;
 11     private WriteRedisService writeRedisService;
 12     private ReadRedisService readRedisService;
 13     private ErpOrderGoodsService erpOrderGoodsService;
 14     private ErpOrderService erpOrderService;
 15     
 16     
 17     public CalculateAmount(int orderId,Date now) {
 18         super();
 19         this.orderId = orderId;
 20         this.now=now;
 21         orderServices=SpringContext.getBean(OrderService.class);
 22         ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class);
 23         ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class);
 24         ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class);
 25         ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class);
 26         statisticsService=SpringContext.getBean(StatisticsService.class);
 27         writeRedisService=SpringContext.getBean(WriteRedisService.class);
 28         readRedisService=SpringContext.getBean(ReadRedisService.class);
 29         erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class);
 30         erpOrderService=SpringContext.getBean(ErpOrderService.class);
 31     }
 32 
 33     @Override
 34     public void run() {
 35         logger.info("CalculateAmount task run start........orderId:"+orderId);
 36         StopWatch watch=new StopWatch();
 37         watch.start();
 38         /**
 39          * 取出訂單相關的所有數據同步到統計的庫中
 40          */
 41         //TODO  考慮要不要將下面所有操作放到一個事務里面
 42         List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId);
 43         if(orders!=null&&orders.size()>0){
 44             Map<String, Object> order = orders.get(0);
 45             
 46             String orderSN=U.nvl(order.get("OrderSN"));//訂單編號
 47             Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用戶d
 48             Integer status=U.nvlInt(order.get("Status"),null);//狀態
 49             Date createTime=now;//(Date)order.get("CreateTime");//創建時間
 50             Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新時間
 51             BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//優惠總額  滿減金額
 52             BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//實付金額
 53             BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//總金額
 54             
 55             //從erp里查詢出訂單的確認時間
 56             int dbConfirmTime=0;
 57             try {
 58                 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId);
 59             } catch (Exception e2) {
 60                 logger.error("",e2);
 61             }
 62             Date ct=new Date(dbConfirmTime*1000L);
 63             
 64             int[] dates=U.getYearMonthDayHour(ct);//
 65             if(modifyTime!=null){
 66                 dates=U.getYearMonthDayHour(modifyTime);//
 67             }
 68             int year=dates[0];//
 69             int month=dates[1];//
 70             int day=dates[2];//
 71             int hour=dates[3];//小時
 72             
 73             String ordersId=orderId+"";//生成訂單id
 74             
 75             //查詢訂單的來源和搜索引擎關鍵字
 76             String source="";
 77             String seKeyWords="";
 78             List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN);
 79             if(orderDataList!=null&&!orderDataList.isEmpty()){
 80                 OrdersData ordersData = orderDataList.get(0);
 81                 source=ordersData.getSource();
 82                 seKeyWords=ordersData.getSeKeyWords();
 83             }
 84             
 85             //TODO 將訂單入庫
 86             ArrayList<RelOrders> relOrdersList = Lists.newArrayList();
 87             RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未計算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime);
 88             relOrdersList.add(relOrders);
 89             
 90             try {
 91                 relOrders.setConfirmTime(ct);
 92                 //查詢RelOrders是否存在
 93                 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN);
 94                 if(dbOrders!=null){
 95                     //更新
 96                     dbOrders.setStatus(Byte.valueOf(status+""));
 97                     dbOrders.setConfirmTime(ct);
 98                     dbOrders.setModifyTime(modifyTime);
 99                     this.statisticsService.updateByPrimaryKeySelective(dbOrders);
100                     return;
101                 }else{
102                     Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);
103                 }
104             } catch (Exception e) {
105                 logger.error("insertRelOrdersBatch error",e);
106             }
107             /**
108              * 查這個訂單的返現、優惠券、禮品卡  的金額
109              */
110             List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);
111             List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);
112             
113             BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返現金額
114             BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//紅包金額
115             /**
116              * 查詢出這個訂單的所有商品
117              */
118             List<Map<String, Object>> products=null;
119             Map<String,Object> productToKeyWordMap=Maps.newHashMap();
120             try {
121                 products = this.ordHaveProductService.selectByOrderId(orderId);
122                 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);
123                 if(ordersItemDataList!=null){
124                     for (OrdersItemData ordersItemData : ordersItemDataList) {
125                         productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());
126                     }
127                 }
128             } catch (Exception e1) {
129                 logger.error("",e1);
130             }
131             if(products!=null){
132                 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();
133                 for (Map<String, Object> product : products) {
134                     Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id
135                     Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//購買數量
136                     String SN=U.nvl(product.get("SN"),"");
137                     BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//購買價格
138                     BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//購買總價格
139                     BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//單品實付金額
140                     
141                     BigDecimal cost=null;//商品成本  TODO 調別人的接口
142                     BigDecimal realtimeAmount=null;//實付金額
143                     
144                     BigDecimal pdCashAmount=BigDecimal.ZERO;//每個商品的返現
145                     BigDecimal pdcouponAmont=BigDecimal.ZERO;//每個商品的優惠券
146                     
147                     //商品價格所占訂單比例
148                     if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){
149                         pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);
150                         pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);
151                         discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);
152                     }
153                     
154                     realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);
155                     
156                     RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));
157                     
158                     relOrdersItemList.add(item);
159                     
160                     //如果確認時間屬於同一天的話,將商品實付金額放入到redis排行榜中
161                     if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){
162                         //如果訂單的狀態是這幾種,剛將該商品加入到實付金額的排行 榜中
163                         dates=U.getYearMonthDayHour(ct);//
164                         int days=dates[2];
165                         //某一個商品某一天的實付金額
166                         BigDecimal itemRelAmount=BigDecimal.ZERO;
167                         //從redis里取出這個商品的實付金額,然后累加
168                         String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
169                         if(StringUtils.isNotBlank(itemRelAmountStr)){
170                             itemRelAmount=new BigDecimal(itemRelAmountStr);
171                         }
172                         realtimeAmount=itemRelAmount.add(realtimeAmount);
173                         writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());
174                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
175                         writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");
176                         //確認的銷量
177                         Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);
178                         writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");
179                         
180                         String itemType="";
181                         Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);
182                         itemType=pMap.get("categoryId");
183                         if(StringUtils.isNotBlank(itemType)){
184                             if(ProductCategory.isGuanBai(itemType)){
185                                 //如果是白酒  官白的訪客數排行 
186                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//
187                                 //確認的銷量排行
188                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//
189                             }else if(ProductCategory.isGuanHong(itemType)){
190                                 //官紅的訪客數排行 
191                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//
192                                 //確認的銷量排行
193                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//
194                             }
195                         }
196                         
197                         //某一個商品的銷量加入刪除列表
198                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
199                     }
200                 }
201                 try {
202                     //TODO 將訂單商品明細入庫
203                     this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);
204                     //再將訂單的狀態改為已計算
205                     this.statisticsService.updateIsCal(orderSN,IsCal.已計算.getFlag());//將是否計算改成已計算
206                     //該訂單的所有商品的成本同步到現在的庫中。
207                     this.calOrderProductCostSync(orderId,orderSN,products);
208                 } catch (Exception e) {
209                     logger.error("insertRelOrdersItemBatch or updateIsCal error",e);
210                 }
211             }
212         }
213         watch.stop();
214         logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+"   orderId:"+orderId);
215     }
216   
217     private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){
218         List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);
219         if(ordersList==null||ordersList.isEmpty()){
220             logger.error("according orderId to query some data from erp return is null.........");
221             return;
222         }
223         Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");
224         
225         List<RelItemCosts> list=Lists.newArrayList();
226         for (Map<String, Object> map : ordersList) {
227             RelItemCosts itemCost=new RelItemCosts();
228             if(map==null){
229                 continue;
230             }
231             Integer itemId=U.nvlInt(map.get("goods_id"),-99);
232             BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);
233             itemCost.setId(U.randomUUID());
234             itemCost.setOrdersId(orderId+"");
235             itemCost.setOrdersNo(orderSN);
236             itemCost.setItemId(itemId);
237             itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));
238             itemCost.setCosts(costs);
239             itemCost.setCreateTime(new Date());
240             itemCost.setModifyTime(new Date());
241             list.add(itemCost);
242         }
243         
244         this.statisticsService.insertRelItemCostsBatch(list);
245         
246     }
247     
248 }

 

  注意:

    1、redis2.6版本使用lpush、rpop出列的時候會丟失數據。換成2.8及以上的版本運行正常。

    2、由於應用會部署到多個結點,所以無法直接采用java的BlockingQueue阻塞隊列,幫采用redis提供的隊列支持。

    3、如果要做到統計的絕對實時,最好采用大數據的實時計算的解決方案:kafka+storm 來實現

  以上為隊列結合線程的實踐案例,供大家一起探討。

    轉載請注明出處 ,請大家尊重作者的勞動成果。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM