本文描述了一個系統,功能是評價和抽象地理圍欄(Geo-fencing),以及監控和分析核心地理圍欄中業務的表現。
技術棧:Spring-JQuery-百度地圖WEB SDK
存儲:Hive-Elasticsearch-MySQL-Redis
什么是地理圍欄?
LBS系統中,地理圍欄指的是虛擬邊界圍成的部分。
tips:這只是一個demo,支撐實習生的本科畢設,不代表生產環境,而且數據已經做了脫密處理,為了安全還是隱去了所有數據。
功能描述
1、地理圍欄的圈選
(1)熱力圖
熱力圖展示的是,北京市最近一天的業務密度(這里是T+1數據,在實際工作場景中往往是通過實時流采集分析實時的數據)
(2)圈選地理圍欄
系統提供了圓形(距中心點距離)、矩形、多邊形三種類型的圖形圈選,並通過百度地圖SDK采集圖形的信息。
2、地理圍欄的持久化
(1)提供地理圍欄的持久化功能
(2)地理圍欄列表
下面是持久化的地理圍欄列表,可以看到類型和圍欄信息。
當圈選完成,可以選擇持久化地理圍欄,這個圍欄將會沉淀下來,供后續業務分析和監控。
3、聚合分析
(1)提供日訂單量,日盈利和日取消率的聚合分析
例如下圖是在某個地理圍欄區域內,11月這30天內,訂單量的變化。
(2)詳細列表
提供每一天數據的詳細信息,對異常點可以標紅和預警
上面基本就是系統的全部核心功能。下面進入實現部分。
實現 - 數據准備
1、數據源
數據源應該是業務的數據庫(例如訂單庫)以及客戶端埋點日志(端動作),公司的離線采集和ETL團隊經過了漫長的工作,將數據處理好存入了Hive中。
對於本文系統來說,數據源就是Hive中的order表。要做的是將Hive中的數據導入到Elasticsearch中,使用Elasticsearch強大的GEO Query支持進行分析。
2、數據導入
數據的導入使用的是一段Java的Spark腳本。
(1)先解決依賴
spark-core是必備依賴。引入spark-hive來處理Hive中的數據。引入elasticsearch-hadoop來搞定Hive到ES的寫入。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>2.3.4</version> </dependency>
(2)編寫spark腳本
先上代碼
public class ToES implements Serializable { transient private JavaSparkContext javaSparkContext; transient private HiveContext hiveContext; private String num; /* * 初始化Load * 創建sparkContext, hiveContext * */ public ToES(String num) { this.num = num; initSparckContext(); initHiveContext(); } /* * 創建sparkContext * */ private void initSparckContext() { SparkConf sparkConf; String warehouseLocation = System.getProperty("user.dir"); sparkConf = new SparkConf() .setAppName("to-es") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("yarn-client") .set("es.nodes", "10.93.21.21,10.93.18.34,10.93.18.35,100.90.62.33,100.90.61.14") .set("es.port", "8049").set("pushdown", "true").set("es.index.auto.create", "true"); javaSparkContext = new JavaSparkContext(sparkConf); } /* * 創建hiveContext * 用於讀取Hive中的數據 * */ private void initHiveContext() { hiveContext = new HiveContext(javaSparkContext); } /* * 使用spark-sql從hive中讀取數據, 然后寫入es. * */ public void hive2es() { String query = String.format("select * from kangaroo.order where concat_ws('-', year, month, day) = '%s' and product_id in (3,4) and area = 1", transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num)*24*60*60*1000L, "yyyy-MM-dd")); DataFrame rows = hiveContext.sql(query) .select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns", "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type"); JavaRDD<Map<String, Object>> rdd = rows.toJavaRDD().map(new Function<Row, Map<String, Object>>() { /* * 轉換成Map, 解決字段類型不匹配問題 * */ @Override public Map<String, Object> call(Row row) throws Exception { Map<String, Object> map = new HashMap<String, Object>(); Map<String, Object> location = new HashMap<String, Object>(); for (int i=0; i<row.size(); i++) { String key = row.schema().fields()[i].name(); Object value = row.get(i); map.put(key, value); } location.put("lat", Double.parseDouble(map.get("starting_lat").toString())); location.put("lon", Double.parseDouble(map.get("starting_lng").toString())); map.remove("starting_lat"); map.remove("starting_lng"); map.put("location", location); map.put("date", transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num)*24*60*60*1000L, "yyyy-MM-dd")); return map; } }); Map<String, String> map = new HashMap<String, String>(); map.put("es.mapping.id", "order_id"); JavaEsSpark.saveToEs(rdd, "moon/bj", map); } public String transTimeToFormat(long currentTime, String formatStr) { String formatTime = null; try { SimpleDateFormat format = new SimpleDateFormat(formatStr); formatTime = format.format(currentTime); } catch (Exception e) { } return formatTime; } public static void main(String[] args) { String num = args[0]; ToES toES = new ToES(num); toES.hive2es(); } }
SparkContext和HiveContext的初始化,請自行參考代碼。
ES的集群配置是在sparkConf中加載進去的,加載方式請自己參照代碼。
1)數據過濾
hive-sql
select * from kangaroo.order where concat_ws('-', year, month, day) = '%s' and product_id in (3,4) and area = 1
說明:
a)Hive的order表實現為一個外部表,year/month/day是分區字段,也就是說數據是按照天為粒度掛載的。
b)product_id是業務編號,這里過濾出了目標業務的訂單。
c)area為城市編號,這里只過濾出北京。
2)列的裁剪
Elasticsearch有個弊端是由於索引的建立,當數據導入Elasticsearch數據量會膨脹,所以一定要進行維度的裁剪。
我們的訂單Hive表姑且就叫它order吧,這個表有40+個字段,我們導入到ES中,只選用了其中的12個字段。
在代碼中是,通過DataFrame的select實現的裁剪
DataFrame rows = hiveContext.sql(query) .select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns", "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type");
可能會有這樣的好奇,這樣做在hive-sql中把所有字段全拿到然后在裁剪?為什么不直接在sql語句中進行裁剪?簡單解釋一下,由於spark的惰性求值,應該是沒有區別的。
3)map轉換操作
下面將dataFrame轉換成rdd,執行map操作,將每一條記錄進行處理,處理的核心邏輯,是將starting_lng、starting_lat壓成一個HashMap的location字段。
為什么要這樣做呢?
因為在Elasticsearch中要這樣存儲點的經緯度,並且將location字段聲明為geo_point類型,才能使用空間索引查詢。
然后我們順便生成了一個date字段,表示訂單是哪一天的,方便后面的以天為粒度進行聚合查詢。
4)批量存入ES
Map<String, String> map = new HashMap<String, String>(); map.put("es.mapping.id", "order_id"); JavaEsSpark.saveToEs(rdd, "moon/bj", map);
這樣就將rdd中的數據批量存入到ES中了,存入的索引是index=moon,type=bj,這里映射了order_id為ES文檔的document_id。我們下面馬上就會說如何建立moon/bj的mapping
5)ES索引建立
再將數據導入到ES之前,要建立index和mapping。
創建index=moon
curl -XPOST "http://10.93.21.21:8049/moon?pretty"
創建type=bj的mapping
curl -XPOST "http://10.93.21.21:8049/moon/bj/_mapping?pretty" -d ' { "bj": { "properties": { "order_id": {"type": "long"}, "order_status": {"type": "long"}, "tip": {"type": "long"}, "bouns": {"type": "long"}, "pre_total_fee": {"type": "long"}, "dynamic_price": {"type": "long"}, "product_id": {"type": "long"}, "type": {"type": "long"}, "dest_name": {"index": "not_analyzed","type": "string"}, "starting_name": {"index": "not_analyzed","type": "string"}, "departure_time": {"index": "not_analyzed","type": "string"}, "location": {"type" : "geo_point"}, "date": {"index": "not_analyzed", "type" : "string"} } } }'
這里要注意的是,location字段的類型-geo_point。
6)打包編譯spark程序
以yarn隊列形式運行
spark-submit --queue=root.*** to-es-1.0-SNAPSHOT-jar-with-dependencies.jar
然后在ES的head中可以看到數據已經加載進去了
至此,數據已經准備好了。
今天先到這,后面的博客會描述如何搞定百度地圖前端和Elasticsearch GEO查詢。