基於百度地圖SDK和Elasticsearch GEO查詢的地理圍欄分析系統(1)-數據准備


本文描述了一個系統,功能是評價和抽象地理圍欄(Geo-fencing),以及監控和分析核心地理圍欄中業務的表現。

技術棧:Spring-JQuery-百度地圖WEB SDK

存儲:Hive-Elasticsearch-MySQL-Redis

 

什么是地理圍欄?

LBS系統中,地理圍欄指的是虛擬邊界圍成的部分。

tips:這只是一個demo,支撐實習生的本科畢設,不代表生產環境,而且數據已經做了脫密處理,為了安全還是隱去了所有數據。

 

功能描述

 

1、地理圍欄的圈選

(1)熱力圖

熱力圖展示的是,北京市最近一天的業務密度(這里是T+1數據,在實際工作場景中往往是通過實時流采集分析實時的數據)

(2)圈選地理圍欄

系統提供了圓形(距中心點距離)、矩形、多邊形三種類型的圖形圈選,並通過百度地圖SDK采集圖形的信息。

 

2、地理圍欄的持久化

(1)提供地理圍欄的持久化功能

 

(2)地理圍欄列表

下面是持久化的地理圍欄列表,可以看到類型和圍欄信息。

當圈選完成,可以選擇持久化地理圍欄,這個圍欄將會沉淀下來,供后續業務分析和監控。

 

3、聚合分析

(1)提供日訂單量,日盈利和日取消率的聚合分析

例如下圖是在某個地理圍欄區域內,11月這30天內,訂單量的變化。

 (2)詳細列表

提供每一天數據的詳細信息,對異常點可以標紅和預警

 

 

上面基本就是系統的全部核心功能。下面進入實現部分。

 

實現 - 數據准備

1、數據源

數據源應該是業務的數據庫(例如訂單庫)以及客戶端埋點日志(端動作),公司的離線采集和ETL團隊經過了漫長的工作,將數據處理好存入了Hive中。

對於本文系統來說,數據源就是Hive中的order表。要做的是將Hive中的數據導入到Elasticsearch中,使用Elasticsearch強大的GEO Query支持進行分析。

 

2、數據導入

數據的導入使用的是一段Java的Spark腳本。

(1)先解決依賴

spark-core是必備依賴。引入spark-hive來處理Hive中的數據。引入elasticsearch-hadoop來搞定Hive到ES的寫入。

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>2.3.4</version>
        </dependency>

 

(2)編寫spark腳本

先上代碼

public class ToES implements Serializable {

    transient private JavaSparkContext javaSparkContext;
    transient private HiveContext hiveContext;
    private String num;
    
    /*
    *   初始化Load
    *   創建sparkContext, hiveContext
    * */
    public ToES(String num) {
        this.num = num;
        initSparckContext();
        initHiveContext();
    }

    /*
    *   創建sparkContext
    * */
    private void initSparckContext() {
        SparkConf sparkConf;
        String warehouseLocation = System.getProperty("user.dir");
        sparkConf = new SparkConf()
                .setAppName("to-es")
                .set("spark.sql.warehouse.dir", warehouseLocation)
                .setMaster("yarn-client")
                .set("es.nodes", "10.93.21.21,10.93.18.34,10.93.18.35,100.90.62.33,100.90.61.14")
                .set("es.port", "8049").set("pushdown", "true").set("es.index.auto.create", "true");
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    /*
    *   創建hiveContext
    *   用於讀取Hive中的數據
    * */
    private void initHiveContext() {
        hiveContext = new HiveContext(javaSparkContext);
    }

    /*
    *   使用spark-sql從hive中讀取數據, 然后寫入es.
    * */
    public void hive2es() {
        String query = String.format("select * from kangaroo.order where concat_ws('-', year, month, day) = '%s' and product_id in (3,4) and area = 1",
                transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num)*24*60*60*1000L, "yyyy-MM-dd"));
        DataFrame rows = hiveContext.sql(query)
                .select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns",
                        "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type");
        JavaRDD<Map<String, Object>> rdd = rows.toJavaRDD().map(new Function<Row, Map<String, Object>>() {
            /*
            *   轉換成Map, 解決字段類型不匹配問題
            * */
            @Override
            public Map<String, Object> call(Row row) throws Exception {
                Map<String, Object> map =  new HashMap<String, Object>();
                Map<String, Object> location = new HashMap<String, Object>();
                for (int i=0; i<row.size(); i++) {
                    String key = row.schema().fields()[i].name();
                    Object value = row.get(i);
                    map.put(key, value);
                }
                location.put("lat", Double.parseDouble(map.get("starting_lat").toString()));
                location.put("lon", Double.parseDouble(map.get("starting_lng").toString()));
                map.remove("starting_lat");
                map.remove("starting_lng");
                map.put("location", location);
                map.put("date", transTimeToFormat(System.currentTimeMillis() - Integer.parseInt(num)*24*60*60*1000L, "yyyy-MM-dd"));
                return map;
            }
        });
        Map<String, String> map = new HashMap<String, String>();
        map.put("es.mapping.id", "order_id");
        JavaEsSpark.saveToEs(rdd, "moon/bj", map);
    }

    public String transTimeToFormat(long currentTime, String formatStr) {
        String formatTime = null;
        try {
            SimpleDateFormat format =  new SimpleDateFormat(formatStr);
            formatTime = format.format(currentTime);
        } catch (Exception e) {
        }
        return formatTime;
    }

    public static void main(String[] args) {
        String num = args[0];
        ToES toES = new ToES(num);
        toES.hive2es();
    }
}

SparkContext和HiveContext的初始化,請自行參考代碼。

ES的集群配置是在sparkConf中加載進去的,加載方式請自己參照代碼。

 

1)數據過濾

hive-sql

select * from kangaroo.order where concat_ws('-', year, month, day) = '%s' and product_id in (3,4) and area = 1

說明:

a)Hive的order表實現為一個外部表,year/month/day是分區字段,也就是說數據是按照天為粒度掛載的。

b)product_id是業務編號,這里過濾出了目標業務的訂單。

c)area為城市編號,這里只過濾出北京。

 

2)列的裁剪

Elasticsearch有個弊端是由於索引的建立,當數據導入Elasticsearch數據量會膨脹,所以一定要進行維度的裁剪。

我們的訂單Hive表姑且就叫它order吧,這個表有40+個字段,我們導入到ES中,只選用了其中的12個字段。

在代碼中是,通過DataFrame的select實現的裁剪

DataFrame rows = hiveContext.sql(query)
                .select("order_id", "starting_lng", "starting_lat", "order_status", "tip", "bouns",
                        "pre_total_fee", "dynamic_price", "product_id", "starting_name", "dest_name", "type");

可能會有這樣的好奇,這樣做在hive-sql中把所有字段全拿到然后在裁剪?為什么不直接在sql語句中進行裁剪?簡單解釋一下,由於spark的惰性求值,應該是沒有區別的。

 

3)map轉換操作

下面將dataFrame轉換成rdd,執行map操作,將每一條記錄進行處理,處理的核心邏輯,是將starting_lng、starting_lat壓成一個HashMap的location字段。

為什么要這樣做呢?

因為在Elasticsearch中要這樣存儲點的經緯度,並且將location字段聲明為geo_point類型,才能使用空間索引查詢。

然后我們順便生成了一個date字段,表示訂單是哪一天的,方便后面的以天為粒度進行聚合查詢。

 

4)批量存入ES

        Map<String, String> map = new HashMap<String, String>();
        map.put("es.mapping.id", "order_id");
        JavaEsSpark.saveToEs(rdd, "moon/bj", map);

這樣就將rdd中的數據批量存入到ES中了,存入的索引是index=moon,type=bj,這里映射了order_id為ES文檔的document_id。我們下面馬上就會說如何建立moon/bj的mapping

 

5)ES索引建立 

再將數據導入到ES之前,要建立index和mapping。

創建index=moon

curl -XPOST "http://10.93.21.21:8049/moon?pretty"

創建type=bj的mapping

curl -XPOST "http://10.93.21.21:8049/moon/bj/_mapping?pretty" -d '
{
    "bj": {
        "properties": {
            "order_id": {"type": "long"},
            "order_status": {"type": "long"},
            "tip": {"type": "long"},
            "bouns": {"type": "long"},
            "pre_total_fee": {"type": "long"},
            "dynamic_price": {"type": "long"},
            "product_id": {"type": "long"},
            "type": {"type": "long"},
            "dest_name": {"index": "not_analyzed","type": "string"},
            "starting_name": {"index": "not_analyzed","type": "string"},
            "departure_time": {"index": "not_analyzed","type": "string"},
            "location": {"type" : "geo_point"},
            "date": {"index": "not_analyzed", "type" : "string"}
        }
    }
}'

這里要注意的是,location字段的類型-geo_point。

 

6)打包編譯spark程序

以yarn隊列形式運行

spark-submit --queue=root.*** to-es-1.0-SNAPSHOT-jar-with-dependencies.jar

然后在ES的head中可以看到數據已經加載進去了

 

至此,數據已經准備好了。

今天先到這,后面的博客會描述如何搞定百度地圖前端和Elasticsearch GEO查詢。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM