YDB基本使用詳解(轉)


第七章YDB基本使用詳解

一、如何與YDB對接(交互)

目前延雲YDB提供如下幾種方式

l命令行的方式

lWeb http接口的方式

lJDBC接口的方式

通過Java編程接入

通過可視化SQL分析統計接入

通過報表分析工具接入

二、命令行接入

進入ya100的安裝目錄的bin目錄

1.直接執行 ./conn.sh 即可。

2.通過./sql.sh -f xxx.sql 直接執行文件中的SQL

 

三、WEB接入

WEB接口主要是為了給那些不支持HDBC訪問的程序提供接口支持,如PHP

1.圖形SQL 提交地址

http://xxx.xx.xx.xx:1210/sparkview

2.Json接口

http://xxx.xx.xx.xx:1210/sparksql?sql=?

SQL參數可以GET方式提交,也可以POST方式提交

四、JDBC接口

1.JDBC接入方式連接字符串

Connection conn = DriverManager.getConnection("jdbc:hive2://ydbmaster:10009/default", "hdfs", "");

 

l10009表示JDBC的端口號,配置的值在ya100_evn.sh里面可以找到

lhdfs表示連接時所使用的Hadoop賬號,大家也要跟配置文件中一致,以免其他未知賬號產生垃圾文件沒有及時的清理掉,以及造成Hadoop權限問題。

這個賬號的配置目前存在兩個位置,請大家配置一致,使用同一個賬號。

 

 

 

2.JAVA編程接口

                  Class.forName("org.apache.Hive.jdbc.HiveDriver");

                  Connection conn = DriverManager.getConnection("jdbc:hive2://ydbmaster:10009/default", "hdfs", "");

                  Statement smst = conn.createStatement();

                  ResultSet rs = smst.executeQuery("/*ydb.pushdown('->')*/ select * from ydb_example_shu where ydbpartion = '3000w' limit 10 /*('<-')pushdown.ydb*/");

                  ResultSetMetaData m = rs.getMetaData();

                  int columns = m.getColumnCount();

                  for (int i = 1; i <= columns; i++) {

                           System.out.print(m.getColumnName(i));

                           System.out.print("\t\t");

                  }

                  while (rs.next()) {

                           for (int i = 1; i <= columns; i++) {

                                    System.out.print(rs.getString(i));

                                    System.out.print("\t\t");

                           }

                           System.out.println();

                  }

                  rs.close();

                  conn.close();

依賴的JDBC客戶端jar包可以從這個地址獲取,本質上就是HIVE的thrift接口,依賴的jar包也是Hive的jar包

http://url.cn/42R4CG8

 

 

 

 

3.通過可視化SQL分析統計接入

SQL分析工具有很多,只要支持HIVE接口即可,免費的有Squirrel、收費的有DbVisualizer等

 

4.通過報表分析工具接入

通過可視化報表分析工具,可以極大的提高程序開發的效率,只要是支持HIVE接口的可視化報表工具,都可以與YDB集成,下面以帆軟報表為例。

 

 

 

 

五、YDB分區

1.關於分區的說明

       隨着時間的日積月累,單個索引會越來越大,從而導致系統瓶頸。YDB不會將全部的數據都完整的創建在一個索引中,YDB會對數據進行分區,分區的方式由用戶來定義,可以按照日期分區,也可以按照某些固定的HASH方式來分區。

       一條數據的分區,在導入的時候直接指定,具體請參考后面的數據導入用法。

       如果按照日期進行分區,每天就會生成一個分區,如需查詢哪天的數據,就去對應的分區中檢索,其他的分區則閑置。

       YDB 的SQL需要通過ydbpartion來指定分區; SQL查詢必須要設置分區,而且要寫在SQL的最外層。

       如果沒有指定ydbpartion分區的查詢,ydb表默認會去查詢 " ydb_default_partion" 這個分區,也就是說,如果我們真的不想進行數據分區,不想在sql上添加ydbpartion的條件來指定分區,那么請將數據都導入到 " ydb_default_partion"這個分區里面。

 

設置分區例子如下:

 ydbpartion ='20140928'

 ydbpartion in ('20140928','20140927')

目前不支持大於等於,小於等於的范圍指定分區,僅支持等於與in的方式。

 

2.關於分區的數量與粒度,控制多少比較好?

l如果我們的數據可以按照時間進行切分,是不是切分的越細越好?

       很遺憾,YDB並不適合特別多的分區,分區越多代表索引文件越多

1)YDB中打開一個索引是有很大的開銷的,打開一個索引加載的列的信息、索引的BlockTree的相關主干節點等,需要消耗較多的內存,而且要持久化到內存里去維護這個索引的狀態。這就是為什么大家會發現,對於一個表第一次查詢會比較慢,但是我們進行一次count以后,在進行別的查詢就會快很多。

2)YDB在一個進程里能夠打開的索引數量是有限的,如果超過了打開的索引文件數量,那么就要關閉一些索引,以保證內存不會OOM。

3)小文件太多,對HDFS的NameNode的壓力較大。

 

l那么分區粒度控制在多少為好?

基本原則就是在避免索引頻繁的打開與關閉的情況下,索引粒度越小越好。

1)如果我們的數量不是很大,一年加在一起還不到10億,那么我就建議采用按年分區。

2)如果我們的數據處於中等,每月的數據增量為1億左右,那么我們建議按照季度分區。

3)如果我們的數據每天寫入量特別大,如果按照月份分區,單個索引太大會造成寫入瓶頸,那么我們建議按照天進行分區。

很多時候我們還可以根據不同的查詢方式,采用兩種粒度的分區

1)最近一兩天的數據經常被查詢,我們最近3天的數據按照天進行分區

2)但是偶爾也會發生查詢整年的數據,如果采用按天分區的話,一次打開的索引太多,那么我們可以再加一個按照季度的分區。

3)按天的數據分區只保存最近7天的數據,超過7天的數據會通過insert的方式歸檔按照季度的分區里。

      

 

六、YDB的數據類型

1.基本類型

基本類型的存儲方式均為 按列存儲

YDB類型

只索引

只存儲

Hive類型

解釋

string

synn

---

string

字符串類型,該類型不分詞,通常用來存儲比較短的字符串,如類目

tint

tiynn

---

int

整形32位-適合大范圍的range過濾查詢

tlong

tlynn

---

bigint

整形64位-適合大范圍的range過濾查詢

tdouble

tdynn

---

double

Double類型-適合大范圍的range過濾查詢

tfloat

tfynn

---

float

Float類型-適合大范圍的range過濾查詢

int

iynn

---

int

整形32位,占用存儲空間少,但是范圍查找性能低

long

lynn

---

bigint

整形64位,占用存儲空間少,但是范圍查找性能低

double

dynn

---

double

Double類型,占用存儲空間少,但是范圍查找性能低

float

fynn

---

float

Float類型,占用存儲空間少,但是范圍查找性能低

geopoint

---

---

bigint

用於地理位置搜索-使用方法詳見《26.地理位置感知搜索.txt》

 

2.分詞類型

       分詞( Word Segmentation) 指的是將一個詞字序列切分成一個一個單獨的詞。分詞就是將連續的詞序列按照一定的規范重新組合成詞序列的過程.

       分詞類型,均為按行存儲,在YDB中可以進行模糊檢索,但是不能在SQL里面進行group by(YSQL函數以外是可以的)。

 

YDB類型

只索引

只存儲

Hive類型

解釋

simpletext

simpletextyn

simpletextny

string

 ydb內置的普通文本分詞 采用1~3元分詞

haoma

haomayn

haomany

string

ydb內置的適合號碼類型的分詞,采用3~5元分詞實現,分詞粒度為char

chepai

chepaiyn

chepainy

string

ydb內置的適合號碼類型的分詞,采用2~5元分詞實現,分詞粒度為char

text

tyn

tny

string

為lucene默認的standard分詞,在(處理手機號,郵箱,IP地址,網址等中英文與字典組合的數據上 不准確,請慎用)

cjkyy

cjkyn

cjkny

string

為lucene默認的cjk分詞即二元分詞 (處理手機號,郵箱,IP地址,網址等中英文與字典組合的數據上 不准確,請慎用)

 

以下類型除了分詞外,還保存了分詞后的詞的順序 ,可以進行順序匹配

 

YDB類型

只索引

只存儲

Hive類型

解釋

charlike

---

---

string

按照字符char 1~5元分詞 (效果較好,term區分了詞元,適合車牌,手機號類型的較短文本)

wordlike

---

---

string

按字與詞 1~3元分詞 (效果較好,term區分了詞元,適合文本類型)

pchepai

---

---

string

按照字符char 2~5元分詞

phaoma

---

---

string

按照字符char 3~5元分詞

psimpletext

---

---

string

按字與詞 1~3元分詞

pyy

pyn

pny

string

lucene的cjk分詞,中文采用二元分詞,英文與數字采用 單字分詞

 

3.多值列類型

有些時候,我們想在一個列里面存儲多個值的時候,就可以考慮使用多值列了

比如說,可以將一個人 的多個標簽值 存儲在一個記錄里面,一個人的每天的行為數據 放在一個記錄里面。

一定要注意,

1.字符串類型的多值列,返回的值的無序,並且是排重的,故這塊有額外注意。

2.數值型的則是有序的(與導入的順序一致),並且是沒有排重的。

3.傳遞的數值是按照空格拆分的,如 11 22 33 44 

4.如果傳遞的是空值,會當做null處理

多值列所有數據類型均為按列存儲

 

YDB類型

Hive類型

解釋

mt_syn

string

 string類型的多值列

mt_tlyn

string

tlong類型的多值列

mt_lyn

string

long類型的多值列

mt_tdyn

string

tdouble類型的多值列

mt_dyn

string

double類型的多值列

mt_iyn

string

int類型的多值列

mt_tiyn

string

tint類型的多值列

mt_fyn

string

float類型的多值列

mt_tfyn

string

tfolat類型的多值列

 

 

 

 

七、創建YDB表

/*ydb.pushdown('->')*/

create table ydb_example_shu(

phonenum long,

usernick string,

ydb_sex string,

ydb_province string,

ydb_grade string,

ydb_age string,

ydb_blood string,

ydb_zhiye string,

ydb_earn string,

ydb_prefer string,

ydb_consume string,

ydb_day string,

amtdouble tdouble,

amtlong int,

content textcjk

)

/*('<-')pushdown.ydb*/

 

 

 

八、將HIVE表中的數據導入到YDB中

通過ydbpartion表向YDB中導入數據,下面示例中的ydb_example_shu為YDB表的表名,3000w為YDB表的分區名。

1.直接追加數據

insert into  table ydbpartion

select 'ydb_example_shu', '3000w', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_txt;

 

2.覆蓋數據

insert overwrite table  ydbpartion

 select 'ydb_example_shu', '3000w', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_txt;

 

3.在追加數據前,先執行按條件刪除

insert into  table ydbpartion

 select 'ydb_example_shu', '3000w', 'ydb_sex='男'  and ydb_blood='A'',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_txt;

 

 

  

4.HIVE表數據導入優化-控制並發數

#######為什么要控制並發數############

1)啟動時候的Map數量不容易控制,如果啟動的map數量很多,而Spark又沒有容量調度器,會占滿所有的資源,影響查詢。

2)所以很多時候我們的業務期望,在進行數據導入的時候,不要啟動太多的Map數量,而是希望留出一部分資源,能讓給查詢,於是控制Map數量就顯得特別重要了。

3)我們導入數據,傾向於數據能更均衡一些,這樣查詢的時候,不會因為數據傾斜而影響性能。

4)針對大量小文件,Spark並沒有像Hive那樣使用了combine inputformat ,合並map查詢,這樣會導致啟動的map數量很多,我們希望依然采用Hive那種能夠將一些小的Map進行合並。

 

YDB提供了combine的方法,用來解決上述問題

類名為cn.NET.ycloud.ydb.handle.YdbCombineInputFormat (舊版名字為:cn.Net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat)

 

1)####文本形式的示例####

drop table ydb_import_txt;

CREATE external  table ydb_import_txt(

phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string,multyvalue string

)

row format delimited fields terminated by ','

stored as

    INPUTFORMAT 'cn.net.ycloud.ydb.handle.YdbCombineInputFormat'

    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

location '/data/example/ydb'

TBLPROPERTIES (

    'ydb.combine.input.format.raw.format'='org.apache.hadoop.mapred.TextInputFormat'

);

select count(*) from ydb_import_txt limit 10;

 

insert overwrite table  ydbpartion

select 'ydb_example_shu', 'txt', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_txt;

 

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion = 'txt'

/*('<-')pushdown.ydb*/

;

 

 

2)####RCFILE格式示例####

drop table ydb_import_rcfile;

 

CREATE external  table ydb_import_rcfile(

phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

)

ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'

STORED AS

    INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

    OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'

TBLPROPERTIES (

    'ydb.combine.input.format.raw.format'='org.apache.hadoop.hive.ql.io.RCFileInputFormat'

);

 

insert overwrite  table ydb_import_rcfile select * from ydb_import_txt;

select count(*) from ydb_import_rcfile limit 10;

 

insert overwrite table  ydbpartion

select 'ydb_example_shu', 'rcfile', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_rcfile;

 

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion = 'rcfile'

/*('<-')pushdown.ydb*/

;

 

 

 

3)####SEQUENCEFILE格式示例####

drop table ydb_import_sequencefile;

 

CREATE external  table ydb_import_sequencefile(

phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

)

ROW FORMAT SERDE  'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'

STORED AS

    INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

    OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'

TBLPROPERTIES (

    'ydb.combine.input.format.raw.format'='org.apache.hadoop.mapred.SequenceFileInputFormat'

);

 

SET hive.exec.compress.output=true;

SET mapred.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec;

SET mapred.output.compression.type=BLOCK;

insert overwrite  table ydb_import_sequencefile select * from ydb_import_txt;

select count(*) from ydb_import_sequencefile limit 10;

 

insert overwrite table  ydbpartion

select 'ydb_example_shu', 'sequencefile', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_sequencefile;

 

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion = 'sequencefile'

/*('<-')pushdown.ydb*/

 

4)####PARQUET格式示例####

###Spark內部對SERDE含有Parquet格式的類名進行了特殊處理,會導致設置的inputformat不生效,所以YDB也特殊處理下,就換成不含有Parquet的名字

drop table ydb_import_parquet;

CREATE external  table ydb_import_parquet(

phonenum string, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong bigint,content string

)

ROW FORMAT SERDE  'cn.net.ycloud.ydb.handle.combine.YdbParHiveSerDe'

STORED AS

    INPUTFORMAT    'cn.net.ycloud.ydb.handle.YdbCombineInputFormat' 

    OUTPUTFORMAT   'cn.net.ycloud.ydb.handle.combine.YdbParMapredParquetOutputFormat'

TBLPROPERTIES (

    'ydb.combine.input.format.raw.format'='org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'

);

 

set parquet.block.size=16777216;

insert overwrite  table ydb_import_parquet select * from ydb_import_txt;

select count(*) from ydb_import_parquet limit 10;

insert overwrite table  ydbpartion

select 'ydb_example_shu', 'parquet', '',

    YROW(

        'phonenum',phonenum,

        'usernick',usernick,

        'ydb_sex',ydb_sex,

        'ydb_province',ydb_province,

        'ydb_grade',ydb_grade,

        'ydb_age',ydb_age,

        'ydb_blood',ydb_blood,

        'ydb_zhiye',ydb_zhiye,

        'ydb_earn',ydb_earn,

        'ydb_prefer',ydb_prefer,

        'ydb_consume',ydb_consume,

        'ydb_day',ydb_day,

        'amtdouble',amtdouble,

        'amtlong',amtlong,

        'content',content

    )

from ydb_import_parquet;

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion = 'parquet'

/*('<-')pushdown.ydb*/

 

 

 

九、YDB 查詢SQL 寫法

注意YDB的表強制必須指定分區

為了區分YDB表與Hive表,YDB語句需要使用

/*ydb.pushdown('->')*/ 與 /*('<-')pushdown.ydb*/ 前后包含起來,以方便解析

1.基本示例

----count(*)計數

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion = '2015'

/*('<-')pushdown.ydb*/ ;

 

----數據預覽

/*ydb.pushdown('->')*/

select * from ydb_example_shu where ydbpartion = '3000w' limit 10

/*('<-')pushdown.ydb*/;

 

----全文檢索

/*ydb.pushdown('->')*/

select content,usernick from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' limit 10

/*('<-')pushdown.ydb*/;

 

----多個條件組合過濾

/*ydb.pushdown('->')*/

select ydb_sex,ydb_grade,ydb_age,ydb_blood,amtlong from ydb_example_shu where ydbpartion = '3000w' and ydb_sex='女' and ydb_grade='本科' and (ydb_age='20到30歲' or ydb_blood='O') and  (amtlong like '([3000 TO 4000] )') limit 10

/*('<-')pushdown.ydb*/;

 

----sum求和

/*ydb.pushdown('->')*/

select sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w'

/*('<-')pushdown.ydb*/;

 

----avg求平均數

/*ydb.pushdown('->')*/

select avg(amtdouble) as avgamt from ydb_example_shu where ydbpartion = '3000w'

/*('<-')pushdown.ydb*/;

 

----更復雜點的統計

/*ydb.pushdown('->')*/

select count(*),count(amtdouble),avg(amtdouble),sum(amtdouble),min(amtdouble),max(amtdouble)

,min(ydb_province),max(ydb_province) from ydb_example_shu where ydbpartion = '3000w'

/*(‘<-’)pushdown.ydb*/;

 

----單列group by

/*ydb.pushdown('->')*/

select ydb_sex,count(*),count(amtdouble),sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex limit 10

/*('<-')pushdown.ydb*/;

 

----多列group by

/*ydb.pushdown('->')*/

select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble),sum(amtdouble) from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province order by cnt desc limit 10

/*('<-')pushdown.ydb*/;

 

----top N 排序

/*ydb.pushdown('->')*/

select ydb_sex, phonenum,amtlong,amtdouble

 from ydb_example_shu where ydbpartion='3000w'  order by amtdouble desc ,amtlong limit 10

/*('<-')pushdown.ydb*/;

 

2.YDB特有的BlockSort排序(排序大躍進)

   按照時間逆序排序可以說是很多日志系統的硬指標。在延雲YDB系統中,我們改變了傳統的暴力排序方式,通過索引技術,可以超快對數據進行單列排序,不需要全表暴力掃描,這個技術我們稱之為BlockSort,目前支持tlong、tdouble、tint、tfloat四種數據類型。

   由於BlockSort是借助搜索的索引來實現的,所以采用blockSort的排序,不需要暴力掃描,性能有大幅度的提升。

   BlockSort的排序,並非是預計算的方式,可以全表進行排序,也可以基於任意的過濾篩選條件進行過濾排序。

 

 

正常寫法

blockSort寫法

單列升序

/*ydb.pushdown('->')*/

select tradetime, nickname from blocksort_ydb

 order by tradetime limit 10

/*('<-')pushdown.ydb*/;

 

/*ydb.pushdown('->')*/

select tradetime, nickname from blocksort_ydb where

    ydbkv='blocksort.field:tradetime' and 
    ydbkv='blocksort.desc:false' and

    ydbkv='blocksort.limit:10'

     order by tradetime limit 10

/*('<-')pushdown.ydb*/;

單列降序

/*ydb.pushdown('->')*/

select tradetime, nickname from blocksort_ydb

 order by tradetime desc limit 10

/*('<-')pushdown.ydb*/;

/*ydb.pushdown('->')*/

select tradetime, nickname from blocksort_ydb where

    ydbkv='blocksort.field:tradetime' and

    ydbkv='blocksort.limit:10' and

    ydbkv='blocksort.desc:true'

     order by tradetime desc limit 10

/*('<-')pushdown.ydb*/;

3.數據導出

 

----導出數據到hive表

insert overwrite table ydb_import_importtest

/*ydb.pushdown('->')*/

select  phonenum,usernick,ydb_sex,ydb_province,

ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,

ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content

from ydb_example_shu where ydbpartion = '3000w'

/*('<-')pushdown.ydb*/;

 

#有limit的導出示例 (在Spark的當前版本有BUG,需要采用如下變通方法解決)

insert overwrite table ydb_import_importtest

select * from (

/*ydb.pushdown('->')*/

    select 

     phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content 

from ydb_example_shu where ydbpartion = '3000w' and ydbkv='export.max.return.docset.size:1000' 

/*('<-')pushdown.ydb*/

   

) tmp  order by rand() limit 1000;

 

----數據導出到YDB的其他分區里示例

insert overwrite table ydbpartion

select 'ydb_example_shu', 'test3', '',

    YROW(

        'phonenum',tmp.phonenum,

        'usernick',tmp.usernick,

        'ydb_sex',tmp.ydb_sex,

        'ydb_province',tmp.ydb_province,

        'ydb_grade',tmp.ydb_grade,

        'ydb_age',tmp.ydb_age,

        'ydb_blood',tmp.ydb_blood,

        'ydb_zhiye',tmp.ydb_zhiye,

        'ydb_earn',tmp.ydb_earn,

        'ydb_prefer',tmp.ydb_prefer,

        'ydb_consume',tmp.ydb_consume,

        'ydb_day',tmp.ydb_day,

        'amtdouble',tmp.amtdouble,

        'amtlong',tmp.amtlong,

        'content',tmp.content

    )

from (

/*ydb.pushdown('->')*/

select

    phonenum,usernick,ydb_sex,ydb_province,ydb_grade,ydb_age,ydb_blood,ydb_zhiye,ydb_earn,ydb_prefer,ydb_consume,ydb_day,amtdouble,amtlong,content 

from ydb_example_shu where ydbpartion = '3000w'

/*('<-')pushdown.ydb*/

) tmp

;

 

----導出數據到HDFS

 

由於Spark當前版本無法通過insert Directory的方式直接導出數據到HDFS,但是可以將數據導出到Hive表,故數據導出到HDFS可以通過導出到Hive表變通的方式來解決

 

可以通過創建一個導出表來解決

 CREATE external  table ydb_import_importtest(

    phonenum bigint, usernick string, ydb_sex string, ydb_province string, ydb_grade string, ydb_age string, ydb_blood string, ydb_zhiye string, ydb_earn string, ydb_prefer string, ydb_consume string, ydb_day string, amtdouble double,amtlong int,content string

)location '/data/example/ydb_import_importtest';

 

 

如果我們創建表的時候,沒有加location,我們可以通過show create table xxx表名 可以看到location的位置

 

 

4.多表關聯示例

 

1)---兩個卡口left semi join

  select  k1.vehiclePlate as vehiclePlate from (

           /*ydb.pushdown('->')*/

          select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'    

          /*('<-')pushdown.ydb*/

    ) k1

    LEFT SEMI JOIN

   (

       /*ydb.pushdown('->')*/

       select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'   

       /*('<-')pushdown.ydb*/

    ) k2

    on (k1.vehiclePlate=k2.vehiclePlate);

 

+---------------+--+

| vehiclePlate  |

+---------------+--+

| c22           |

| c23           |

| c33           |

| c34           |

+---------------+--+

 

2)---兩個卡口left join

  select  k1.vehiclePlate as vehiclePlate,k2.vehiclePlate from (

       /*ydb.pushdown('->')*/

       select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

       /*('<-')pushdown.ydb*/

) k1

LEFT JOIN

 (

    /*ydb.pushdown('->')*/

    select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

    /*('<-')pushdown.ydb*/

) k2

on (k1.vehiclePlate=k2.vehiclePlate);

 

+---------------+---------------+--+

| vehiclePlate  | vehiclePlate  |

+---------------+---------------+--+

| c11           | NULL          |

| c22           | c22           |

| c23           | c23           |

| c33           | c33           |

| c34           | c34           |

+---------------+---------------+--+

 

 

3)---三個卡口left semi join

select k21.vehiclePlate from(

    select  k1.vehiclePlate as vehiclePlate from (

            /*ydb.pushdown('->')*/

             select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

           /*('<-')pushdown.ydb*/

    ) k1

    LEFT SEMI JOIN

   (

       /*ydb.pushdown('->')*/

       select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'

      /*('<-')pushdown.ydb*/ 

    ) k2

    on (k1.vehiclePlate=k2.vehiclePlate)

 ) k21

LEFT SEMI JOIN

(

   /*ydb.pushdown('->')*/

   select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

  /*('<-')pushdown.ydb*/

 ) k22 on k21.vehiclePlate=k22.vehiclePlate order by k21.vehiclePlate;

 

+---------------+--+

| vehiclePlate  |

+---------------+--+

| c33           |

| c34           |

+---------------+--+

 

 

 

4)---三個卡口left join

select k21.vehiclePlate,k22.vehiclePlate from(

    select  k1.vehiclePlate as vehiclePlate from (

            /*ydb.pushdown('->')*/

            select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1' 

           /*('<-')pushdown.ydb*/

    ) k1

    LEFT JOIN

   (

         /*ydb.pushdown('->')*/

         select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2'

         /*('<-')pushdown.ydb*/

    ) k2

    on (k1.vehiclePlate=k2.vehiclePlate)

 ) k21

LEFT JOIN

(

     /*ydb.pushdown('->')*/

    select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

    /*('<-')pushdown.ydb*/

 ) k22 on k21.vehiclePlate=k22.vehiclePlate ;

 +---------------+---------------+--+

| vehiclePlate  | vehiclePlate  |

+---------------+---------------+--+

| c11           | NULL          |

| c22           | NULL          |

| c23           | NULL          |

| c33           | c33           |

| c34           | c34           |

+---------------+---------------+--+

 

 

5)----三個卡口 先left SEMI join 之后再 left join

 

select k21.vehiclePlate,k22.vehiclePlate from(

    select  k1.vehiclePlate as vehiclePlate from (

           /*ydb.pushdown('->')*/

           select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='1'

           /*('<-')pushdown.ydb*/

    ) k1

    LEFT SEMI JOIN

   (

        /*ydb.pushdown('->')*/

       select vehiclePlate,tollCode from vehiclepass where ydbpartion = '3000w' and tollCode='2' 

       /*('<-')pushdown.ydb*/

    ) k2

    on (k1.vehiclePlate=k2.vehiclePlate)

 ) k21

LEFT JOIN

(

    /*ydb.pushdown('->')*/

    select vehiclePlate,tollCode from vehiclepass  where ydbpartion = '3000w' and tollCode='3' 

    /*('<-')pushdown.ydb*/

 ) k22 on k21.vehiclePlate=k22.vehiclePlate ;

 

 

 +---------------+---------------+--+

| vehiclePlate  | vehiclePlate  |

+---------------+---------------+--+

| c22           | NULL          |

| c23           | NULL          |

| c33           | c33           |

| c34           | c34           |

+---------------+---------------+--+

 

 

5.UNION示例

 

1)--union--統計的結果

 

select sum(cnt) as cnt from

(

 

/*ydb.pushdown('->')*/

 select count(*) as cnt from ydb_example_shu where ydbpartion = '3000w'

/*('<-')pushdown.ydb*/

 

union all

/*ydb.pushdown('->')*/

select count(*) as cnt from ydb_example_shu where ydbpartion = '300winsert'

/*('<-')pushdown.ydb*/

 

union all

/*ydb.pushdown('->')*/

select count(*) as cnt from ydb_example_shu where ydbpartion = '300winsert2'

/*('<-')pushdown.ydb*/

 

union all

/*ydb.pushdown('->')*/

select count(*) as cnt from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' 

/*('<-')pushdown.ydb*/

 

union all

/*ydb.pushdown('->')*/

select count(*) as cnt from ydb_example_shu where ydbpartion = '20151011' and content='工商銀行'

/*('<-')pushdown.ydb*/

 

union all

/*ydb.pushdown('->')*/

 select count(*) as cnt from ydb_example_shu where ydbpartion = '20151011' 

/*('<-')pushdown.ydb*/

 

) tmp limit 10;

 

 

2)--union order by的結果,注意,這里有個子查詢SQL

select * from

(

/*ydb.pushdown('->')*/ s

elect amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='旺旺' order by amtlong desc limit 1

/*('<-')pushdown.ydb*/

union all

/*ydb.pushdown('->')*/

select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='王老吉' order by amtlong desc limit 1 

/*('<-')pushdown.ydb*/

union all

/*ydb.pushdown('->')*/

select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='匯源' order by amtlong desc limit 1  

/*('<-')pushdown.ydb*/

union all

/*ydb.pushdown('->')*/

select amtlong,content from ydb_example_shu where ydbpartion = '3000w' and content='哇哈哈' order by amtlong desc limit 1 

/*('<-')pushdown.ydb*/ 

 

) tmp  limit 1000;

 

3)YDB表的多個分區一起查詢,通過IN來實現

 

/*ydb.pushdown('->')*/

select count(*),count(amtdouble),sum(amtdouble),avg(amtdouble),min(amtdouble),max(amtdouble),min(ydb_province),max(ydb_province) from ydb_example_shu where   ydbpartion in (  '3000w0','3000w1' ,'3000w2','3000w3','3000w4','3000w5','3000w6','3000w7','3000w8','3000w9','3000w10' ,'3000w11','3000w12','3000w13','3000w14','3000w15' ,'3000w16'  ,'3000w17','3000w18','3000w19'

,'3000a0','3000a1' ,'3000a2','3000a3','3000a4','3000a5','3000a6','3000a7','3000a8','3000a9','3000a10' ,'3000a11','3000a12','3000a13','3000a14','3000a15' ,'3000a16'  ,'3000a17','3000a18','3000a19'

,'3000b0','3000b1' ,'3000b2','3000b3','3000b4','3000b5','3000b6','3000b7','3000b8','3000b9','3000b10' ,'3000b11','3000b12','3000b13','3000b14','3000b15' ,'3000b16'  ,'3000b17','3000b18','3000b19'

)

/*('<-')pushdown.ydb*/

;

 

6.DISTINCT示例

-----#####如果distinct的數據並不多,可以考慮采用collect_set 性能較好#######

 

1)----####直接count distinct##########

select

    size(collect_set(tmp.ydb_sex)) as dist_sex,

    size(collect_set(tmp.ydb_province)) as dist_province,

    count(*) as cnt,

    count(tmp.amtlong) as cnt_long,

    count(distinct tmp.amtlong) as dist_long

from (

    /*ydb.pushdown('->')*/

        select ydb_sex,ydb_province,amtlong from ydb_example_shu where ydbpartion = '3000w' and content='王老吉'

    /*('<-')pushdown.ydb*/

) tmp limit 10;

 

 

2)----group by 加 count distinct####

select

    tmp.ydb_sex as ydb_sex,

    size(collect_set(tmp.ydb_province)) as dist_province,

    count(*) as cnt,

    count(tmp.amtlong) as cnt_long,

    count(distinct tmp.amtlong) as dist_long

from

(

    /*ydb.pushdown('->')*/

        select ydb_sex,ydb_province,amtlong from ydb_example_shu where ydbpartion = '3000w' and content='王老吉'

    /*('<-')pushdown.ydb*/

) tmp

group by tmp.ydb_sex limit 10;

 

7.行轉列示例

 

select ydb_sex,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_province,cnt,cntamt,sumamt)))) from (

    /*ydb.pushdown('->')*/

        select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt,sum(amtdouble) as sumamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province 

    /*('<-')pushdown.ydb*/

)tmp  group by ydb_sex limit 10;

 

 

select ydb_province,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_sex,cnt,cntamt,sumamt)))) from (

    /*ydb.pushdown('->')*/

        select ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt,sum(amtdouble) as sumamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_sex,ydb_province 

    /*('<-')pushdown.ydb*/

)tmp  group by ydb_province order by scnt desc limit 10;

 

select ydb_province,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_blood,ydb_sex,cnt,cntamt)))) from (

    /*ydb.pushdown('->')*/

        select ydb_blood,ydb_sex,ydb_province,count(*) as cnt,count(amtdouble) as cntamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_blood,ydb_sex,ydb_province 

    /*('<-')pushdown.ydb*/

)tmp  group by ydb_province order by scnt desc limit 10;

 

select ydb_day,sum(cnt) as scnt,concat_ws('#', sort_array(collect_set(concat_ws(',',ydb_blood,ydb_sex,cnt,cntamt)))) from (

    /*ydb.pushdown('->')*/

        select ydb_day,ydb_sex,ydb_blood,count(*) as cnt,count(amtdouble) as cntamt from ydb_example_shu where ydbpartion = '3000w' group by ydb_day,ydb_sex,ydb_blood

    /*('<-')pushdown.ydb*/

)tmp  group by ydb_day order by scnt desc limit 10;

 

 

8.對於時間的高效處理

我數據里面時間格式是yyyy-MM-dd hh:mm:ss

1)ydb沒有時間類型,應該怎么處理?

 

可以用tlong類型代替時間類型

       存儲的值 轉換成 yyyyMMddhhmmss ,這樣是定長的,而且可讀性好(比unix時間磋可讀性好)

如果時間精度是 秒 ,毫秒,納秒的 話 一定要使用 tlong  (范圍查找比long快很多),如果是天,小時的話,可以使用long 節省存儲空間

 

2)這些用於時間操作的轉換函數我們一定會用到

cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHHmmss') as bigint),

cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHHmm')as bigint),

cast (from_unixtime(unix_timestamp(substring(recevicetime,0,18),'dd-MMM-yy HH.mm.ss'),'yyyyMMddHH')as bigint) ,

 

select (2017-cast(substring('201831198307123487',7,4) as bigint) ) from spark_txt limit 10;

 

 

 

 

9.null值與空值的匹配

1)----匹配空串

/*ydb.pushdown('->')*/

select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex='empty'

/*('<-')pushdown.ydb*/

;

2)--匹配非空值

/*ydb.pushdown('->')*/

select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and  ydb_sex<>'empty'

/*('<-')pushdown.ydb*/

;

 

3)--匹配null值

##############null值的匹配非常消耗性能,采用暴力掃描倒排表的方式實現,如果該列的值排重后的值特別多,如sessionId,身份證號碼,手機號等,請慎用########

####如果是檢索明細數據,建議在hive層進行過濾####

####TODO 未來可以通過標簽里面的live bits改進null值匹配的性能####

 

/*ydb.pushdown('->')*/

select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex='null'

/*('<-')pushdown.ydb*/

 

4)--匹配非null值

 

 

/*ydb.pushdown('->')*/

select phonenum,usernick,ydb_sex,ydb_province from ydb_example_shu where ydbpartion = 'nullcheck' and ydb_sex<>'null'

/*('<-')pushdown.ydb*/

;

 

10.近似文本匹配

 

1)近似文本匹配

有些時候,我們只想找到一篇跟當前指定文章類似的文章。可能中間相差幾個字不一樣無所謂,或者局部的字順序前后顛倒也無所謂。

需要注意

a)單詞會進行排重。

b)並不考慮單詞順序(雖然偽造的數據是有順序的,但是匹配是不考慮順序的)。

c)30表示排重后,至少有30%的單詞會匹配上才算匹配

d)匹配是按照分詞的結果后進行匹配的,並不是按照空格進行拆分的,具體如何檢驗分詞,

 

1:近似文本匹配示例

 

/*ydb.pushdown('->')*/

select  content from ydb_example_shu where ydbpartion = '3000w'  and content='YTermlike@30@100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 ' limit 10

/*('<-')pushdown.ydb*/;

 

 

2)近似特征匹配

有一種搜索是這樣的搜索,我指定一系列的特征,如 高矮、胖瘦、年齡段、性別、時間等一系列目擊者看到的嫌疑人特征,但是有可能有些目擊者描述的不准確,所以不能進行精確匹配,如果能與大部分的匹配條件都相似,一兩個條件沒匹配上,但已經足以相似了,那么也要返回匹配結果。

 

--五個特征中必須匹配4個特征

 

/*ydb.pushdown('->')*/

select ydb_sex,ydb_province,ydb_blood,amtdouble,content from ydb_example_shu where ydbpartion = '3000w'  and ydb_raw_query_s like 'YQuerylike@hits=4&fq=ydb_sex:女&fq=ydb_province:遼寧&fq=amtdouble:[14 TO 200]&fq=ydb_blood:O&fq=content:王老吉'

limit 10

/*('<-')pushdown.ydb*/;

 

==通過wt與score參數將content模糊匹配的權重增大

 

   

/*ydb.pushdown('->')*/

select ydb_sex,ydb_province,ydb_blood,amtdouble,content from ydb_example_shu where ydbpartion = '3000w'  and ydb_raw_query_s like 'YQuerylike@hits=4&score=8&fq=ydb_sex:女&wt=1&fq=ydb_province:遼寧&wt=1&fq=amtdouble:[14 TO 200]&wt=1&fq=ydb_blood:O&wt=1&fq=content:王老吉&wt=9'

limit 10

/*('<-')pushdown.ydb*/;

 

 

 

11.多值列示例

有些時候,我們想在一個列里面存儲多個值的時候,就可以考慮使用多值列了

比如說,可以將一個人 的多個標簽值 存儲在一個記錄里面,一個人的每天的行為數據 放在一個記錄里面。

一定要注意,

1.字符串類型的多值列,返回的值的無序,並且是排重的,故這塊有額外注意。

2.數值型的則是有序的(與導入的順序一致),並且是沒有排重的。

3.傳遞的數值是按照空格 拆分的  ,如 11 22 33 44 

4.如果傳遞的是空值,會當做null處理

5.只要數據類型定義為多之列,程序目前會按照空格識別來將傳入的數據拆分成多個值分別存儲

 

1)--檢索--

 

/*ydb.pushdown('->')*/

select multyvalue_string,multyvalue_tlong,multyvalue_long,multyvalue_tdouble,multyvalue_double from ydb_example_shu_multyvalue where ydbpartion='3000w'

limit 10

/*('<-')pushdown.ydb*/;

 

 

2)--多值列group by

 

/*ydb.pushdown('->')*/

select multyvalue_long,count(*) as cnt from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long order by cnt

limit 10

/*('<-')pushdown.ydb*/;

 

 

3)普通列與多值列group by

 

/*ydb.pushdown('->')*/

select multyvalue_long,ydb_sex,count(*) as cnt from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long,ydb_sex order by cnt

limit 10

/*('<-')pushdown.ydb*/;

4)--兩個多值列group by ,(笛卡爾集,要注意內存,以及性能,要慎重)

/*ydb.pushdown('->')*/

select multyvalue_long,multyvalue_string,count(*) from ydb_example_shu_multyvalue where ydbpartion = '3000w'  group by multyvalue_long,multyvalue_string order by  multyvalue_long,multyvalue_string

limit 10

/*('<-')pushdown.ydb*/;

 

 

12.地理位置感知搜索

       現在手機APP滿天飛,我想大家都用過這個功能:【搜索我附近的飯店或賓館】之類的功能,類似這樣的地理位置搜索功能非常適用,因為它需要利用到用戶當前的地理位置數據,是以用戶角度出發,找到符合用戶自身需求的信息,應用返回的信息對於用戶來說滿意度會比較高。可見,地理位置空間搜索在提高用戶體驗方面有至關重要的作用。在Lucene中,地理位置空間搜索是借助Spatial模塊來實現的。

       要實現地理位置空間搜索,我們首先需要對地理位置數據創建索引,比較容易想到的就是把經度和緯度存入索引,可是這樣做,有個弊端,因為地理位置數據(經緯度)是非常精細的,一般兩個地點相差就0.0幾,這樣我們需要構建的索引體積會很大,這會顯著減慢你的搜索速度。在精確度上采取折衷的方法通常是將緯度和經度封裝到層中。您可以將每個層看作是地圖的特定部分的縮放級別,比如位於美國中央上方的第 2 層幾乎包含了整個北美,而第 19 層可能只是某戶人家的后院。尤其是,每個層都將地圖分成 2層的箱子或網格。然后給每個箱子分配一個號碼並添加到文檔索引中。如果希望使用一個字段,那么可以使用 Geohash編碼方式將緯度/經度編碼到一個 String 中。Geohash 的好處是能夠通過切去散列碼末尾的字符來實現任意的精度。在許多情況下,相鄰的位置通常有相同的前綴。

 

 

1)測試表的創建,注意使用mortonhash的列的類型是geopoint

create table lonlattable_test(

lon tdouble,

lat tdouble,

mortonhash geopoint

)

 

2)導入數據-注意YMortonHash函數是用於生成Morton數的,將來在索引中用於匹配

insert overwrite table  ydbpartion

select 'lonlattable_test', '3000w', '',

    YROW(

        'lon',r[0],

        'lat',r[1],

        'mortonhash',YMortonHash(r[0],r[1])

    )

from  ydb where YSQL('from','select LAT,LON from ydb_oribit where ydbpartion='20160619' ','segment') ;

 

3)#數據預覽,注意YMortonUnHash用於將數據在還原為經緯度,YMortonHashDistance則用來計算距離,單位是m

select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

(

/*ydb.pushdown('->')*/

select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'

/*('<-')pushdown.ydb*/

)tmp  order by distance limit 10 ;

 

 

 

4)地理位置檢索,給一個坐標,搜尋最近多少米遠的所有記錄,注意YGeo@的使用

 

 

select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

(

/*ydb.pushdown('->')*/

select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'  and ydb_raw_query_s like 'YGeo@fl=mortonhash&lon=8.1&lat=9.2&radius=10000'

/*('<-')pushdown.ydb*/

)tmp  order by distance limit 10 ;

 

 

5)####################按照矩形區域搜索isbox=true

 

select tmp.lon,tmp.lat,tmp.mortonhash,YMortonUnHash(tmp.mortonhash),YMortonHashDistance(tmp.mortonhash,8.1,9.2) as distance from

(

/*ydb.pushdown('->')*/

select lon,lat,mortonhash from lonlattable_test where ydbpartion='3000w'  and ydb_raw_query_s like 'YGeo@fl=mortonhash&isbox=true&lon=8.1&lat=9.2&radius=10000'

/*('<-')pushdown.ydb*/

)tmp  order by distance limit 10 ;

 

13.考慮單詞順序的模糊匹配

 

默認YDB提供了simpletex,haoma等類型進行模糊匹配。

他們本質上是通過分詞進行匹配,並不考慮匹配的詞的順序,如果要進行模糊匹配並且又要保證匹配的先后順序,那么就需要在進行中文分詞的時候保存詞的位置。

 

如果保存了順序,我們可以通過Ylike@方法 按照單詞順序進行匹配查詢

如:

phonenum='Ylike@824963'

phonenum='Ylike@188*63*72*76'

phonenum='Ylike@188*2*6*3*6*88'

content='Ylike@可口*可樂*磊'

content='Ylike@14 15 * 24 28 * 37 41  '

 

目前保存詞的位置的數據類型有如下幾種:

charlike: 按照字符char 1~5元分詞 (效果較好,term區分了詞元,適合車牌,手機號類型的較短文本)

wordlike: 按字與詞 1~3元分詞 (效果較好,term區分了詞元,適合文本類型)

pchepai:按照字符char 2~5元分詞

phaoma :按照字符char 3~5元分詞

psimpletext: 按字與詞 1~3元分詞

pyy :lucene的cjk分詞,中文采用二元分詞,英文與數字采用 單字分詞

 

 

 

注意:目前的這種Ylike還實現不了前綴與后綴匹配,如果要進行前綴與后綴匹配,建議在導入數據前,加入前綴與后綴的特殊符號

比如說如果ip地址是192.168.3.40,那么我們可以使用charlike類型的字段,並且導入的時候 加上 start192.168.3.40end ,這樣前后分別由start與end里兩個特殊的字符串

這樣進行前綴匹配的時候,可以通過phonenum='Ylike@start192.168' 來匹配,后綴匹配可以通過 phonenum='Ylike@3.40end' 來進行匹配

 

 

1.##############號碼與車牌類型的示例

 

/*ydb.pushdown('->')*/

select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@824963'

 limit 10

/*('<-')pushdown.ydb*/

;

 

+------------------+--+

| phonenum  |

+------------------+--+

| 18882496377      |

| 18824963110      |

| 18824963481      |

| 17082496383      |

| 13824963971      |

| 15928249639      |

| 18824963904      |

| 13238249639      |

+------------------+--+

8 rows selected (0.272 seconds)

 

2.#######使用*通配符###

 

 

/*ydb.pushdown('->')*/

select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@824*963'

 limit 100

/*('<-')pushdown.ydb*/

;

 

+------------------+--+

| phonenum  |

+------------------+--+

| 13824096330      |

| 13824229634      |

| 18824963481      |

| 18824096302      |

| 17082496383      |

| 18824296372      |

| 18824963110      |

| 18824196307      |

| 13238249639      |

| 13824769963      |

| 18824649639      |

| 18882496377      |

| 13482479635      |

| 13824799638      |

| 13824963971      |

| 18824396346      |

| 15928249639      |

| 18824963904      |

| 18898248963      |

+------------------+--+

19 rows selected (0.26 seconds)

 

 

 

 

/*ydb.pushdown('->')*/

select  phonenum from ydb_example_shu_positon where ydbpartion = '3000w'  and phonenum='Ylike@188*63*72*76'

 limit 100

/*('<-')pushdown.ydb*/

;

+------------------+--+

| phonenum  |

+------------------+--+

| 18863872476      |

| 18863767276      |

| 18836372076      |

| 18863726576      |

+------------------+--+

4 rows selected (0.241 seconds)

 

 

3.文本類型順序匹配檢索示例

/*ydb.pushdown('->')*/

select  content from ydb_example_shu_positon where ydbpartion = '3000w'  and content='Ylike@1 5 14 15 24 28 37 41 49'

 limit 100

/*('<-')pushdown.ydb*/

;

4.通過* 允許中間某些詞 不連續,但依然保證順序######

 

/*ydb.pushdown('->')*/

select  content from ydb_example_shu_positon where ydbpartion = '3000w'  and content='Ylike@1 5 14 * 24 28 37'

 limit 100

/*('<-')pushdown.ydb*/

;

 

14.管理員命令

--查看YDB表

/*ydb.pushdown('->')*/

show tables

/*('<-')pushdown.ydb*/

;

 

--查看表的分區

/*ydb.pushdown('->')*/

show partions ydb_example_shu

/*('<-')pushdown.ydb*/

;

 

 

--按條件刪除

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion='3000w' and ydb_sex='男' and ydb_blood='A' and  ydbkv='ydb.delete.query:true'

/*('<-')pushdown.ydb*/

;

 

--整個分區清理,數據清空,但是分區還在

 

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_shu where ydbpartion='3000w' and ydbkv='ydb.truncate:true'

/*('<-')pushdown.ydb*/

;

 

 

--物理清理掉整個分區的數據(清理后分區也跟着刪掉)

/*ydb.pushdown('->')*/

 drop table ydb_example_shu partions 3000a4

/*('<-')pushdown.ydb*/

;

 

/*ydb.pushdown('->')*/

 drop table ydb_example_shu partions 3000a4,3000a5,3000a6

/*('<-')pushdown.ydb*/

;

 

 

--刪除一個表的所有分區-保留表結構

/*ydb.pushdown('->')*/

 truncate table ydb_example_shu

/*('<-')pushdown.ydb*/

;

 

--刪除一個表,表結構也刪除掉

/*ydb.pushdown('->')*/

 drop table ydb_example_shu

/*('<-')pushdown.ydb*/

;

--暫停kafka的消費3600秒

/*ydb.pushdown('->')*/

select count(*) from y_system_log where ydbkv='ydb.reader.pause:true' and ydbkv='ydb.reader.pause.secs:3600'

/*('<-')pushdown.ydb*/

;

 

--恢復 暫停的kafka的消費,讓kafka繼續消費數據

/*ydb.pushdown('->')*/

'select count(*) from y_system_log where  ydbkv='ydb.reader.pause:true' and ydbkv='ydb.reader.pause.secs:0'

/*('<-')pushdown.ydb*/

;

 

--將binlog立即刷到磁盤上

 

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync.binlog:true'

/*('<-')pushdown.ydb*/

;

 

--刷新緩沖區的數據,讓其能被搜索到,(binlog會持久化但數據並不會立即持久化到hdfs)

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync.ram:true'

/*('<-')pushdown.ydb*/

;

 

--主動觸發,將內存中的數據刷盤操作,(數據會被搜索到,並且持久化到磁盤)

 

/*ydb.pushdown('->')*/

select count(*) from ydb_example_trade where ydbpartion='k25_005_0' and ydbkv='ydb.force.sync:true'

/*('<-')pushdown.ydb*/

;

 

15.變通方式的分頁方案

       默認Spark SQL無法進行分頁,YDB由於使用了Spark也存在這個問題,故我們采取了變通方式來實現分頁。

以每頁pagesize大小為10為例

 

######1024條記錄以內####

第一頁 直接limit 10,並且將每一行的數據,都按pagekey取個crc32的值 存儲在lru的hashmap中

第二頁 直接limit 20,並且根據第一頁的crc32與當前的20條記錄進行移除,有可能剩余12條或更多,但至少剩余10條,然后取出10條返回,並且將crc32緩存在LRU的hashmap中

第三頁 直接limit 30,同第二頁一樣,移除掉與crc32匹配的記錄,返回10條並且添加第三頁的crc32

這樣一直處理到1024條記錄 ,如果同時能處理1024個session,我們認為內存是能夠放1024*1024個crc32的long類型

 

######超過1024條記錄######

我們采用導出成文件的方式,即insert overwrite table的方式,但是考慮到insert 的方式響應可能會很慢,故我們改寫了outputformat,也就是后面大家看到的YdbMoreTextOutputFormat

通過YdbMoreTextOutputFormat我們不需要等待這個insert overwrite執行完畢后才返回,而是在YdbMoreTextOutputFormat中將少量數據發送到緩沖區供立即返回,而大量數據寫入到磁盤。

 

在這種方式下,由於需要與先前生成的crc32值進行排重,目前的實現沒有精確控制返回的數據條數,而是返回介於pagesize到pagesize*2的記錄數,

 

 

 

----這個分頁的使用限制大家注意--

1.不能跳頁,只能一頁一頁的向下翻。

2.只能向后翻頁,不能向前翻頁。

3.每頁返回的行數是一個近似值,介於pagesize到pagesize*2的記錄數。

4.SQL本身就不在需要寫limit了

4.sql中的 as pagekey與pagevalue 不能省略,本質是KV返回

 

 

如果數據表的規模很大,建議配置如下參數控制每個segments導出的記錄條數,以免占用太多的HDFS空間

and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"

 

###使用方法##

 

---先創建如下的表

drop table ydb_page_session;

 CREATE external  table ydb_page_session(

pagekey string,

pagevalue string

)

partitioned by (ydbsession string)

stored as INPUTFORMAT 'cn.net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat' OUTPUTFORMAT 'cn.net.ycloud.ydb.handle.more.YdbMoreTextOutputFormat'

location '/data/ycloud/ydb/rawdata/ydb_page_session';

 

drop table ydb_more_session;

 CREATE external  table ydb_more_session(

line string

)

partitioned by (ydbsession string)

stored as INPUTFORMAT 'cn.net.ycloud.ydb.handle.Ya100FixNumCombineTextInputFormat' OUTPUTFORMAT 'cn.net.ycloud.ydb.handle.more.YdbMoreTextOutputFormat'

location '/data/ycloud/ydb/rawdata/ydb_more_session';

 

 

 

---通過如下接口查詢數據----

 

注意生成的pagekey不能省略,用於排重,如果是查詢明細可以用y_uuid_s的內置列填充。

 

 http://ydbmaster:1210/ydbpage?reqid=002_page&pagesize=100&sql=select r[0] as pagekey,concat_ws(',',r) as pagevalue from  ydb where YSQL('from','select y_uuid_s,phonenum,usernick,content from ydb_example_shu where ydbpartion="3000w" and content="王老吉" and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

 http://ydbmaster:1210/ydbpage?reqid=00d3_page&pagesize=100&sql=select r[0] as pagekey,concat_ws(',',r) as pagevalue from  ydb where YSQL('from','select y_uuid_s,phonenum,usernick from ydb_example_shu where ydbpartion="3000w"  and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

 http://ydbmaster:1210/ydbpage?reqid=0234_page&pagesize=100&sql=select concat_ws(',',r[0],r[1],r[2]) as pagekey,concat_ws(',',r) as pagevalue from ydb where YSQL('from','select amtdouble,amtlong,y_uuid_s,content,usernick,ydb_sex from ydb_example_shu where ydbpartion="3000w" and ydbkv="export.max.return.docset.size:1000" and ydbkv="max.return.docset.size:1000"  ','segment')

 

 

16.分詞

 

分詞( Word Segmentation) 指的是將一個詞字序列切分成一個一個單獨的詞。分詞就是將連續的詞序列按照一定的規范重新組合成詞序列的過程。我們知道,在英文的行文中,單詞之間是以空格作為自然分界符的,而中文只是字、句和段能通過明顯的分界符來簡單划界,唯獨詞沒有一個形式上的分界符,雖然英文也同樣存在短語的划分問題,不過在詞這一層上,中文比之英文要復雜的多、困難的多。

 

 

默認YDB提供了如下幾種分詞

 

simpletext ydb內置的普通文本分詞 采用1~3元分詞

haoma ydb內置的適合號碼類型的分詞,采用3~5元分詞實現,分詞粒度為char

chepai ydb內置的適合號碼類型的分詞,采用2~5元分詞實現,分詞粒度為char

text 為lucene默認的standard分詞,在(處理手機號,郵箱,IP地址,網址等中英文與字典組合的數據上 不准確,請慎用)

cjkyy 為lucene默認的cjk分詞即二元分詞 (處理手機號,郵箱,IP地址,網址等中英文與字典組合的數據上 不准確,請慎用)

ikyy與textik 為開源的ik分詞的實現,采用詞庫分詞,詞庫我們可以再lib下找到

 

 

以下類型除了分詞外,還保存了分詞后的詞的順序,可以進行順序匹配 更多請參考《27.考慮單詞順序的模糊匹配》

charlike: 按照字符char 1~5元分詞 (效果較好,term區分了詞元,適合車牌,手機號類型的較短文本)

wordlike: 按字與詞 1~3元分詞 (效果較好,term區分了詞元,適合文本類型)

pchepai:按照字符char 2~5元分詞

phaoma :按照字符char 3~5元分詞

psimpletext: 按字與詞 1~3元分詞

pyy :lucene的cjk分詞,中文采用二元分詞,英文與數字采用 單字分詞

 

 

--我們可以通過如下SQL 了解不同分詞的差異

 

 

 

select

    YAnalyzer('charlike','query','中華人民123456') as charlikequery,

    YAnalyzer('charlike','index','中華人民123456') as charlikeindex

from (

    /*ydb.pushdown('->')*/ 

    select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

    /*('<-')pushdown.ydb*/

) tmp

   

limit 1;

 

 

 

 

select

 

    YAnalyzer('wordlike','query','中華人民123456') as wordlikequery,

    YAnalyzer('wordlike','index','中華人民123456') as wordlikeindex

from

    (

    /*ydb.pushdown('->')*/ 

    select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

    /*('<-')pushdown.ydb*/

) tmp

limit 1;

 

+-------------------------+---------------------------------------------------------------------------+--+

|      wordlikequery      |                               wordlikeindex                               |

+-------------------------+---------------------------------------------------------------------------+--+

| 3@中華人 3@華人民 3@人民123456  | 1@中 1@華 1@人 1@民 1@123456 2@中華 2@華人 2@人民 2@民123456 3@中華人 3@華人民 3@人民123456  |

+-------------------------+---------------------------------------------------------------------------+--+

1 row selected (0.889 seconds)

 

 

select

 

    YAnalyzer('phaoma','query','中華人民123456') as phaomaquery,

    YAnalyzer('phaoma','index','中華人民123456') as phaomaindex

from

    (

    /*ydb.pushdown('->')*/ 

    select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

    /*('<-')pushdown.ydb*/

) tmp

limit 1;

 

 

+--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

|             phaomaquery              |                                               phaomaindex                                               |

+--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

| 中華人民1 華人民12 人民123 民1234 12345 23456  | 中華人 華人民 人民1 民12 123 234 345 456 中華人民 華人民1 人民12 民123 1234 2345 3456 中華人民1 華人民12 人民123 民1234 12345 23456  |

+--------------------------------------+---------------------------------------------------------------------------------------------------------+--+

 

 

 

select

    YAnalyzer('psimpletext','query','中華人民123456') as psimpletextquery,

    YAnalyzer('psimpletext','index','中華人民123456') as psimpletextindex

from

    (

    /*ydb.pushdown('->')*/ 

    select content from ydb_example_shu_multyvalue where ydbpartion='3000w'

    /*('<-')pushdown.ydb*/

) tmp

limit 1;

 

+-------------------+---------------------------------------------------+--+

| psimpletextquery  |                 psimpletextindex                  |

+-------------------+---------------------------------------------------+--+

| 中華人 華人民 人民123456  | 中 華 人 民 123456 中華 華人 人民 民123456 中華人 華人民 人民123456  |

+-------------------+---------------------------------------------------+--+

 

 

ik詞庫分詞

詞庫文件位於 ya100/lib/IK_ext.dic

 

 

17.with as 寫法簡化SQL

       如果我們的SQL,嵌套層級太深,可以考慮通過with as 方法,將子SQL抽取出來,讓整體的SQL看起來邏輯更清晰,大家閱讀SQL的時候也便於理解。

 

       with as 遵循HIVE語法,下面為寫法示例

       Hive 可以用with as將某個查詢命名為一個臨時的表名,其他語句可以隨時使用該臨時表名進行查詢。

with q1 as (select * from src where key= ‘5’),

q2 as (select * from src s2 where key = ‘4’)

select * from q1 union all select * from q2;

 

 

一個簡單的例子

 

with

y_customer as (

/*ydb.pushdown('->')*/

select c_custkey from customer_ydb where c_mktsegment = 'BUILDING'

/*('<-')pushdown.ydb*/

),

y_lineitem as (

/*ydb.pushdown('->')*/

select l_orderkey,l_extendedprice,l_discount from lineitem_ydb where l_shipdate > '1995-03-15'

/*('<-')pushdown.ydb*/

)

,

y_orders as (

/*ydb.pushdown('->')*/

select o_orderdate, o_shippriority,o_orderkey,o_custkey  from orders_ydb where o_orderdate < '1995-03-15'

/*('<-')pushdown.ydb*/

)

select

  l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority

from

  y_customer c join y_orders o

    on  c.c_custkey = o.o_custkey

  join y_lineitem l

    on l.l_orderkey = o.o_orderkey

where

  o_orderdate < '1995-03-15'

group by l_orderkey, o_orderdate, o_shippriority

order by revenue desc, o_orderdate ,l_orderkey, o_shippriority

limit 10;

 

一個稍微復雜點的例子

 

with

y_nation as (

/*ydb.pushdown('->')*/

select n_name,n_regionkey,n_nationkey from nation_ydb

/*('<-')pushdown.ydb*/

),

y_region as (

/*ydb.pushdown('->')*/

select r_regionkey,r_name from region_ydb where r_name = 'EUROPE'

/*('<-')pushdown.ydb*/

),

y_supplier as (

/*ydb.pushdown('->')*/

select   s_acctbal, s_name,s_address,s_phone, s_comment ,s_nationkey,s_suppkey

 from supplier_ydb 

/*('<-')pushdown.ydb*/

),

y_partsupp as (

/*ydb.pushdown('->')*/

select   ps_supplycost,ps_suppkey,ps_partkey

 from partsupp_ydb 

/*('<-')pushdown.ydb*/

),

y_part as (

/*ydb.pushdown('->')*/

select   p_partkey,p_mfgr,p_size,p_type

 from part_ydb   where p_size = 15 and p_type like '%BRASS'

/*('<-')pushdown.ydb*/

),

 

q2_minimum_cost_supplier_tmp1 as (select

  s.s_acctbal, s.s_name, n.n_name, p.p_partkey, ps.ps_supplycost, p.p_mfgr, s.s_address, s.s_phone, s.s_comment

from

  y_nation n join y_region r

  on 

    n.n_regionkey = r.r_regionkey

  join y_supplier s

  on 

s.s_nationkey = n.n_nationkey

  join y_partsupp ps

  on 

s.s_suppkey = ps.ps_suppkey

  join y_part p

  on 

    p.p_partkey = ps.ps_partkey ),

 q2_minimum_cost_supplier_tmp2 as (

select

  p_partkey, min(ps_supplycost)  as ps_min_supplycost

from 

  q2_minimum_cost_supplier_tmp1 

group by p_partkey

)

select

  t1.s_acctbal, t1.s_name, t1.n_name, t1.p_partkey, t1.p_mfgr, t1.s_address, t1.s_phone, t1.s_comment

from

  q2_minimum_cost_supplier_tmp1 t1 join q2_minimum_cost_supplier_tmp2 t2

on

  t1.p_partkey = t2.p_partkey and t1.ps_supplycost=t2.ps_min_supplycost

order by s_acctbal desc, n_name, s_name, p_partkey,p_mfgr,s_address,s_phone,s_comment

limit 100;

 

 

十、通過Kafka實時導入數據

默認的Kafka導入數據只支持Json格式,如果需要支持其他格式,需要自己通過Java寫Parser.。

1.Kafka配置的注意點

第一:注意Kafka server 的num.partitions一定要大於YDB啟動的進程*線程數量,否則有的進程消費不到數據

如果發現之前配置錯了,要清空下ZK相關路徑(現在好像有API接口了),否則修改完了也不生效

Kafka的啟動很簡單 ./kafka-server-start.sh ../config/server.properties

第二:請參考第四章的Kafka配置注意事項,這里不再重復介紹

 

2.YDB中配置Kafka的消費

在ydb_site.yaml中添加如下的配置,並更改相關連接參數

 

 ydb.reader.list: "default,filesplit,kafka_json"

 

 #如果您要使用其他的消息中間件,可以自定義Reader,默認為Kafka的實現

 ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

 

 #如果您的數據格式是非標准的JSON格式,可以自定義Parser,默認為按照Json方式解析

 ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

 

 kafka.topic.kafka_json: "kafkaydb"

 kafka.group.kafka_json: "kafkaydb_group"

 bootstrap.servers.kafka_json: "192.168.3.2:6667"

 

3.重啟YDB后,10分鍾后會開始導入數據

4.Kafka導入的數據格式如下

1)一次只導入一條

{"tablename":"ydbexample","ydbpartion":"20151005","data":{"indexnum":4,"label":"l_4","userage":14,"clickcount":4,"paymoney":4.132,"price":4.12,"content":"4 4 4 延雲 ydb 延雲  測試  中文分詞 中華人民共和國 沈陽延雲雲計算技術有限公司","contentcjk":"4 4 4 延雲 ydb 延雲  測試  中文分詞 中華人民共和國 沈陽延雲雲計算技術有限公司"}}

2)一次導入多條(通過減少數據條數,提升Kafka性能)

{"tablename":"ydb_example_trade","ydbpartion":"20151018","list":[{"tradeid":"2016030811044018","nickname":"凌浦澤","province":"澳門特別行政區","tradetype"

...... ",\"amt\":8321,\"bank\":\"交通銀行\"}"}]}

 

 

 

 

 

5.Kafka模式實時導入數據,為什么會有重復數據

YDB能確保從Kafka消費到的數據0丟失,但是由於Kafka的實現機制,以下情況會導致出現重復數據

具體原理可以參考這篇文章

http://www.iteblog.com/pdf/1716

進程異常退出(主動kill或者進程BUG等原因導致)

       Kafka采用commitoffset的方式提交數據,由於此時會存在數據已經消費,但是Kafka的offset沒有來得及提交,這樣會導致數據重復。

Kafka的Rebalancing

       由於進程退出,或者首次啟動時,會產生一個新的consumer加入一個consumer group時,會有一個rebalance的操作,導致每一個consumer和partition的關系重新分配。也就會發生了rebalancing 。

       如果一個消息已經被消費了,但是還沒有提交offset,就開始了Rebalancing,這個時候會造成數據的重復。這個在Kafka里已經積累了一部分數據后的首次啟動時最為明顯。

如果僅僅發生了一個進程異常退出,但是沒有導致Rebalancing,那么最多重復的數據條數就是這個進程還沒有來得及提交的部分。

如果發生了Rebalancing(進程異常退出也會導致Rebalancing),那么則要按全部沒有來得及提交的線程數來計算。

 

6.Kafka模式對數據可靠性的幾種配置

ydb.realtime.kafka.commit.intervel用來控制Kafka的offset commit頻率,每次commit也會導致binglog(wal)同步,所以該值一般要小於等於ydb.realtime.binlog.sync.intervel的頻率

 

 

盡量減少進程重啟 導致的數據重復的配置(每個線程32條重復數據)

ydb.realtime.kafka.commit.intervel: 32

ydb.realtime.binlog.sync.intervel: 1024

 

 

盡量增加吞吐量的配置, 可能有重復(每個線程1024條重復數據)

 

ydb.realtime.kafka.commit.intervel: 1024

ydb.realtime.binlog.sync.intervel: 2048

7.多個Kafka Topic一起消費

ydb.reader.list: "default,filesplit,kafka_json,kafka_json2,kafka_json3"

 

 ##kafka_json##

 ydb.reader.read.class.kafka_json: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

 ydb.reader.parser.class.kafka_json: "cn.net.ycloud.ydb.server.reader.JsonParser"

 kafka.topic.kafka_json: "a961"

 kafka.group.kafka_json: "bn961n_groupv1_kafka_json"

 bootstrap.servers.kafka_json: "192.168.3.2:6667"

 

 ##kafka_json2##

 ydb.reader.read.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

 ydb.reader.parser.class.kafka_json2: "cn.net.ycloud.ydb.server.reader.JsonParser"

 kafka.topic.kafka_json2: "b961"

 kafka.group.kafka_json2: "bn961n_groupv1_kafka_json2"

 bootstrap.servers.kafka_json2: "192.168.3.2:6667"

 

 ##kafka_json3##

 ydb.reader.read.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.kafka.KafkaDataReader"

 ydb.reader.parser.class.kafka_json3: "cn.net.ycloud.ydb.server.reader.JsonParser"

 kafka.topic.kafka_json3: "c961"

 kafka.group.kafka_json3: "bn961n_groupv1_kafka_json3"

 bootstrap.servers.kafka_json3: "192.168.3.2:6667"

 

十一、通過mdrill提升簡單查詢的查詢速度

       默認配置所有的SQL查詢均是通過spark SQL進行相應,但是由於SPARK本身的框架很重,每次任務調度都會有200~300毫秒的調度時間,對於簡單查詢來說,我們可以將簡單查詢的SQL通過輕量級的mdrill接口進行訪問,而不通過SPARK SQL進行調度,從而在調度上節省時間。

       默認簡單SQL通過mdrill來執行這個功能是關閉的,大家可以通過下述方式,將一個簡單SQL轉換為mdrill查詢。

1.通過統一配置

       可以在ydb_site.yaml里配置 ydb.sql.ismdrill.first的值為true,讓ydb系統自動選擇是使用mdrill來進行查詢還是使用spark調度來執行SQL。

       通過該配置,數據明細查詢、排序,以及不含有group by的統計會通過mdrill查詢。

       group by由於mdrill的一萬個group 的限制,該方式不會啟用。

 

2.通過ydb_force_to_mdrill_mark讓SQL以mdrill的方式顯示執行

如果ydb.sql.ismdrill.first為false,不會使用mdrill的調度,但如果我們在SQL加上該標記,就會強制該SQL采用mdrill的調度,而不是采用spark的調度。

 

示例如下:

/*ydb.pushdown('->')*/

select ydb_sex, count(*) as cbt from ydb_example_shu where ydbpartion='20150811' and ydbkv='mdrill.force:ydb_force_to_mdrill_mark' group by ydb_sex limit 10

/*('<-')pushdown.ydb*/

 

3.通過ydb_force_to_spark_mark讓SQL以spark的方式執行

/*ydb.pushdown('->')*/

select ydb_sex, count(*) as cbt from ydb_example_shu where ydbpartion='20150811' and ydbkv='mdrill.force:ydb_force_to_spark_mark' group by ydb_sex limit 10

/*('<-')pushdown.ydb*/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM