在nginx這一層,接收到訪問請求的時候,就把請求的流量上報發送給kafka
storm才能去消費kafka中的實時的訪問日志,然后去進行緩存熱數據的統計
從lua腳本直接創建一個kafka producer,發送數據到kafka
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip yum install -y unzip unzip lua-resty-kafka-master.zip cp -rf /usr/local/lua-resty-kafka-master/lib/resty /usr/hello/lualib
nginx -s reload
lua腳本:
local cjson = require("cjson") local producer = require("resty.kafka.producer") local broker_list = { { host = "192.168.31.187", port = 9092 }, { host = "192.168.31.19", port = 9092 }, { host = "192.168.31.227", port = 9092 } } local log_json = {} log_json["headers"] = ngx.req.get_headers() log_json["uri_args"] = ngx.req.get_uri_args() log_json["body"] = ngx.req.read_body() log_json["http_version"] = ngx.req.http_version() log_json["method"] =ngx.req.get_method() log_json["raw_reader"] = ngx.req.raw_header() log_json["body_data"] = ngx.req.get_body_data() local message = cjson.encode(log_json); local productId = ngx.req.get_uri_args()["productId"] local async_producer = producer:new(broker_list, { producer_type = "async" }) local ok, err = async_producer:send("access-log", productId, message) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end
兩台機器上都這樣做,才能統一上報流量到kafka
bin/kafka-topics.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --replication-factor 1 --partitions 1 --create
bin/kafka-console-consumer.sh --zookeeper 192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181 --topic access-log --from-beginning
(1)kafka在187上的節點死掉了,可能是虛擬機的問題,殺掉進程,重新啟動一下
nohup bin/kafka-server-start.sh config/server.properties &
(2)需要在nginx.conf中,http部分,加入resolver 8.8.8.8;
(3)需要在kafka中加入advertised.host.name = 192.168.31.187,重啟三個kafka進程
(4)需要啟動eshop-cache緩存服務,因為nginx中的本地緩存可能不在了
基於storm+kafka完成商品訪問次數實時統計拓撲的開發:
總結思路:
1、kafka consumer spout
單獨的線程消費,寫入隊列
nextTuple,每次都是判斷隊列有沒有數據,有的話再去獲取並發射出去,不能阻塞
2、日志解析bolt
3、商品訪問次數統計bolt
4、基於LRUMap完成統計
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; import com.roncoo.eshop.storm.bolt.LogParseBolt; import com.roncoo.eshop.storm.bolt.ProductCountBolt; import com.roncoo.eshop.storm.spout.AccessLogKafkaSpout; /** * 熱數據統計拓撲 * @author Administrator * */ public class HotProductTopology { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("AccessLogKafkaSpout", new AccessLogKafkaSpout(), 1); builder.setBolt("LogParseBolt", new LogParseBolt(), 5) .setNumTasks(5) .shuffleGrouping("AccessLogKafkaSpout"); builder.setBolt("ProductCountBolt", new ProductCountBolt(), 5) .setNumTasks(10) .fieldsGrouping("LogParseBolt", new Fields("productId")); Config config = new Config(); if(args != null && args.length > 1) { config.setNumWorkers(3); try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("HotProductTopology", config, builder.createTopology()); Utils.sleep(30000); cluster.shutdown(); } } }
import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import com.alibaba.fastjson.JSONObject; /** * 日志解析的bolt * @author Administrator * */ public class LogParseBolt extends BaseRichBolt { private static final long serialVersionUID = -8017609899644290359L; private OutputCollector collector; @SuppressWarnings("rawtypes") public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { String message = tuple.getStringByField("message"); JSONObject messageJSON = JSONObject.parseObject(message); JSONObject uriArgsJSON = messageJSON.getJSONObject("uri_args"); Long productId = uriArgsJSON.getLong("productId"); if(productId != null) { collector.emit(new Values(productId)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("productId")); } }
import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.storm.shade.org.json.simple.JSONArray; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.trident.util.LRUMap; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; import com.roncoo.eshop.storm.zk.ZooKeeperSession; /** * 商品訪問次數統計bolt * @author Administrator * */ public class ProductCountBolt extends BaseRichBolt { private static final long serialVersionUID = -8761807561458126413L; private LRUMap<Long, Long> productCountMap = new LRUMap<Long, Long>(1000); private ZooKeeperSession zkSession; private int taskid; @SuppressWarnings("rawtypes") public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.zkSession = ZooKeeperSession.getInstance(); this.taskid = context.getThisTaskId(); new Thread(new ProductCountThread()).start(); // 1、將自己的taskid寫入一個zookeeper node中,形成taskid的列表 // 2、然后每次都將自己的熱門商品列表,寫入自己的taskid對應的zookeeper節點 // 3、然后這樣的話,並行的預熱程序才能從第一步中知道,有哪些taskid // 4、然后並行預熱程序根據每個taskid去獲取一個鎖,然后再從對應的znode中拿到熱門商品列表 initTaskId(context.getThisTaskId()); } private void initTaskId(int taskid) { // ProductCountBolt所有的task啟動的時候, 都會將自己的taskid寫到同一個node的值中 // 格式就是逗號分隔,拼接成一個列表 // 111,211,355 zkSession.acquireDistributedLock(); String taskidList = zkSession.getNodeData(); if(!"".equals(taskidList)) { taskidList += "," + taskid; } else { taskidList += taskid; } zkSession.setNodeData("/taskid-list", taskidList); zkSession.releaseDistributedLock(); } private class ProductCountThread implements Runnable { public void run() { List<Map.Entry<Long, Long>> topnProductList = new ArrayList<Map.Entry<Long, Long>>(); while(true) { topnProductList.clear(); int topn = 3; if(productCountMap.size() == 0) { Utils.sleep(100); continue; } for(Map.Entry<Long, Long> productCountEntry : productCountMap.entrySet()) { if(topnProductList.size() == 0) { topnProductList.add(productCountEntry); } else { // 比較大小,生成最熱topn的算法有很多種 // 但是我這里為了簡化起見,不想引入過多的數據結構和算法的的東西 // 很有可能還是會有漏洞,但是我已經反復推演了一下了,而且也畫圖分析過這個算法的運行流程了 boolean bigger = false; for(int i = 0; i < topnProductList.size(); i++){ Map.Entry<Long, Long> topnProductCountEntry = topnProductList.get(i); if(productCountEntry.getValue() > topnProductCountEntry.getValue()) { int lastIndex = topnProductList.size() < topn ? topnProductList.size() - 1 : topn - 2; for(int j = lastIndex; j >= i; j--) { topnProductList.set(j + 1, topnProductList.get(j)); } topnProductList.set(i, productCountEntry); bigger = true; break; } } if(!bigger) { if(topnProductList.size() < topn) { topnProductList.add(productCountEntry); } } } } // 獲取到一個topn list String topnProductListJSON = JSONArray.toJSONString(topnProductList); zkSession.setNodeData("/task-hot-product-list-" + taskid, topnProductListJSON); Utils.sleep(5000); } } } public void execute(Tuple tuple) { Long productId = tuple.getLongByField("productId"); Long count = productCountMap.get(productId); if(count == null) { count = 0L; } count++; productCountMap.put(productId, count); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; /** * kafka消費數據的spout */ public class AccessLogKafkaSpout extends BaseRichSpout { private static final long serialVersionUID = 8698470299234327074L; private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1000); private SpoutOutputCollector collector; @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; startKafkaConsumer(); } @SuppressWarnings("rawtypes") private void startKafkaConsumer() { Properties props = new Properties(); props.put("zookeeper.connect", "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181"); props.put("group.id", "eshop-cache-group"); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer. createJavaConsumerConnector(consumerConfig); String topic = "access-log"; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (KafkaStream stream : streams) { new Thread(new KafkaMessageProcessor(stream)).start(); } } private class KafkaMessageProcessor implements Runnable { @SuppressWarnings("rawtypes") private KafkaStream kafkaStream; @SuppressWarnings("rawtypes") public KafkaMessageProcessor(KafkaStream kafkaStream) { this.kafkaStream = kafkaStream; } @SuppressWarnings("unchecked") public void run() { ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); while (it.hasNext()) { String message = new String(it.next().message()); try { queue.put(message); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void nextTuple() { if(queue.size() > 0) { try { String message = queue.take(); collector.emit(new Values(message)); } catch (Exception e) { e.printStackTrace(); } } else { Utils.sleep(100); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } }
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * ZooKeeperSession * @author Administrator * */ public class ZooKeeperSession { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); private ZooKeeper zookeeper; public ZooKeeperSession() { // 去連接zookeeper server,創建會話的時候,是異步去進行的 // 所以要給一個監聽器,說告訴我們什么時候才是真正完成了跟zk server的連接 try { this.zookeeper = new ZooKeeper( "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 50000, new ZooKeeperWatcher()); // 給一個狀態CONNECTING,連接中 System.out.println(zookeeper.getState()); try { // CountDownLatch // java多線程並發同步的一個工具類 // 會傳遞進去一些數字,比如說1,2 ,3 都可以 // 然后await(),如果數字不是0,那么久卡住,等待 // 其他的線程可以調用coutnDown(),減1 // 如果數字減到0,那么之前所有在await的線程,都會逃出阻塞的狀態 // 繼續向下運行 connectedSemaphore.await(); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("ZooKeeper session established......"); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取分布式鎖 * @param productId */ public void acquireDistributedLock() { String path = "/taskid-list-lock"; try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for taskid-list-lock"); } catch (Exception e) { // 如果那個商品對應的鎖的node,已經存在了,就是已經被別人加鎖了,那么就這里就會報錯 // NodeExistsException int count = 0; while(true) { try { Thread.sleep(1000); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { count++; System.out.println("the " + count + " times try to acquire lock for taskid-list-lock......"); continue; } System.out.println("success to acquire lock for taskid-list-lock after " + count + " times try......"); break; } } } /** * 釋放掉一個分布式鎖 * @param productId */ public void releaseDistributedLock() { String path = "/taskid-list-lock"; try { zookeeper.delete(path, -1); System.out.println("release the lock for taskid-list-lock......"); } catch (Exception e) { e.printStackTrace(); } } public String getNodeData() { try { return new String(zookeeper.getData("/taskid-list", false, new Stat())); } catch (Exception e) { e.printStackTrace(); } return ""; } public void setNodeData(String path, String data) { try { zookeeper.setData(path, data.getBytes(), -1); } catch (Exception e) { e.printStackTrace(); } } /** * 建立zk session的watcher * @author Administrator * */ private class ZooKeeperWatcher implements Watcher { public void process(WatchedEvent event) { System.out.println("Receive watched event: " + event.getState()); if(KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } } } /** * 封裝單例的靜態內部類 * @author Administrator * */ private static class Singleton { private static ZooKeeperSession instance; static { instance = new ZooKeeperSession(); } public static ZooKeeperSession getInstance() { return instance; } } /** * 獲取單例 * @return */ public static ZooKeeperSession getInstance() { return Singleton.getInstance(); } /** * 初始化單例的便捷方法 */ public static void init() { getInstance(); } }
於雙重zookeeper分布式鎖完成分布式並行緩存預熱:
1、服務啟動的時候,進行緩存預熱
2、從zk中讀取taskid列表
3、依次遍歷每個taskid,嘗試獲取分布式鎖,如果獲取不到,快速報錯,不要等待,因為說明已經有其他服務實例在預熱了
4、直接嘗試獲取下一個taskid的分布式鎖
5、即使獲取到了分布式鎖,也要檢查一下這個taskid的預熱狀態,如果已經被預熱過了,就不再預熱了
6、執行預熱操作,遍歷productid列表,查詢數據,然后寫ehcache和redis
7、預熱完成后,設置taskid對應的預熱狀態
ZKsession重載兩個方法:
/** * 獲取分布式鎖 * @param productId */ public boolean acquireFastFailedDistributedLock(String path) { try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for " + path); return true; } catch (Exception e) { System.out.println("fail to acquire lock for " + path); } return false; } /** * 釋放掉一個分布式鎖 * @param productId */ public void releaseDistributedLock(String path) { try { zookeeper.delete(path, -1); System.out.println("release the lock for " + path + "......"); } catch (Exception e) { e.printStackTrace(); } } public String getNodeData(String path) { try { return new String(zookeeper.getData(path, false, new Stat())); } catch (Exception e) { e.printStackTrace(); } return ""; } public void setNodeData(String path, String data) { try { zookeeper.setData(path, data.getBytes(), -1); } catch (Exception e) { e.printStackTrace(); } }
/** * 獲取分布式鎖 */ public void acquireDistributedLock(String path) { try { zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("success to acquire lock for " + path); } catch (Exception e) { // 如果那個商品對應的鎖的node,已經存在了,就是已經被別人加鎖了,那么就這里就會報錯 // NodeExistsException int count = 0; while(true) { try { Thread.sleep(1000); zookeeper.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception e2) { count++; System.out.println("the " + count + " times try to acquire lock for " + path + "......"); continue; } System.out.println("success to acquire lock for " + path + " after " + count + " times try......"); break; } } }
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.roncoo.eshop.cache.model.ProductInfo; import com.roncoo.eshop.cache.service.CacheService; import com.roncoo.eshop.cache.spring.SpringContext; import com.roncoo.eshop.cache.zk.ZooKeeperSession; /** * 緩存預熱線程 */ public class CachePrewarmThread extends Thread { @Override public void run() { CacheService cacheService = (CacheService) SpringContext. getApplicationContext().getBean("cacheService"); ZooKeeperSession zkSession = ZooKeeperSession.getInstance(); // 獲取storm taskid列表 String taskidList = zkSession.getNodeData("/taskid-list"); if(taskidList != null && !"".equals(taskidList)) { String[] taskidListSplited = taskidList.split(","); for(String taskid : taskidListSplited) { String taskidLockPath = "/taskid-lock-" + taskid; boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath); if(!result) { continue; } String taskidStatusLockPath = "/taskid-status-lock-" + taskid; zkSession.acquireDistributedLock(taskidStatusLockPath); //檢查越熱的狀態 String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid); if("".equals(taskidStatus)) { String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid); JSONArray productidJSONArray = JSONArray.parseArray(productidList); for(int i = 0; i < productidJSONArray.size(); i++) { Long productId = productidJSONArray.getLong(i); String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售后服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}"; ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class); cacheService.saveProductInfo2LocalCache(productInfo); cacheService.saveProductInfo2ReidsCache(productInfo); } zkSession.setNodeData(taskidStatusLockPath, "success"); } zkSession.releaseDistributedLock(taskidStatusLockPath); zkSession.releaseDistributedLock(taskidLockPath); } } } }
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.roncoo.eshop.cache.model.ProductInfo; import com.roncoo.eshop.cache.service.CacheService; import com.roncoo.eshop.cache.spring.SpringContext; import com.roncoo.eshop.cache.zk.ZooKeeperSession; /** * 緩存預熱線程 */ public class CachePrewarmThread extends Thread { @Override public void run() { CacheService cacheService = (CacheService) SpringContext. getApplicationContext().getBean("cacheService"); ZooKeeperSession zkSession = ZooKeeperSession.getInstance(); // 獲取storm taskid列表 String taskidList = zkSession.getNodeData("/taskid-list"); if(taskidList != null && !"".equals(taskidList)) { String[] taskidListSplited = taskidList.split(","); for(String taskid : taskidListSplited) { String taskidLockPath = "/taskid-lock-" + taskid; boolean result = zkSession.acquireFastFailedDistributedLock(taskidLockPath); if(!result) { continue; } String taskidStatusLockPath = "/taskid-status-lock-" + taskid; zkSession.acquireDistributedLock(taskidStatusLockPath); //檢查越熱的狀態 String taskidStatus = zkSession.getNodeData("/taskid-status-" + taskid); if("".equals(taskidStatus)) { String productidList = zkSession.getNodeData("/task-hot-product-list-" + taskid); JSONArray productidJSONArray = JSONArray.parseArray(productidList); for(int i = 0; i < productidJSONArray.size(); i++) { Long productId = productidJSONArray.getLong(i); String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售后服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:00:00\"}"; ProductInfo productInfo = JSONObject.parseObject(productInfoJSON, ProductInfo.class); cacheService.saveProductInfo2LocalCache(productInfo); cacheService.saveProductInfo2ReidsCache(productInfo); } zkSession.setNodeData(taskidStatusLockPath, "success"); } zkSession.releaseDistributedLock(taskidStatusLockPath); zkSession.releaseDistributedLock(taskidLockPath); } } } }