pig學習


一、pig:

  pig提供了一個基於Hadoop的並行地執行數據流處理的引擎。它包含了一種腳本語言,稱為Pig Latin。(類似SQL)

二、pig本地安裝(僅用於本地小代碼測試):

下載地址:https://mirrors.tuna.tsinghua.edu.cn/apache/pig/pig-0.17.0/

創建Pig安裝目錄並解壓到該目錄下:

mkdir /opt/pig_home

tar -zxvf pig-0.17.0.tar.gz -C /opt/pig_home

設置環境變量:

vim /etc/profile

加入

export PIG_HOME=/opt/pig_home/pig-0.17.0

export PATH=$PATH:$PIG_HOME/bin

source /etc/profile

檢測是否成功:

pig –x local  :即可進入pig交互式界面。

 

三、Pig Latin語法:

1、數據類型:

基本類型: int 、long、float、double、chararray、bytearray

復雜類型:Map、Tuple、Bag

Tuple:行/記錄,有序的字段集合,如('bob',55)為一個包含兩個字段的tuple常量。

Bag:表,無序的tuple集合,如{('bob',55),('sally',52),('john',25)}為一個包含三個tuple的bag.    [ pig中沒有list或者set類型,所以常將一個int字段放入tuple中,再放入bag中,如{(1),(2),(3)} ]

Field:屬性/字段,Pig不要求同一個bag里的每個tuple有相同數量或者相同類型的field

pig中有null值概念,其表示該值是未知的,可能是因為確實或者處理數據時發生了錯誤。

關系(relation)、包(bag)、元組(tuple)、字段(field)、數據(data)的關系

    • 一個關系(relation)是一個包(bag),更具體地說,是一個外部的包(outer bag)。
    • 一個包(bag)是一個元組(tuple)的集合。在pig中表示數據時,用大括號{}括起來的東西表示一個包——無論是在教程中的實例演示,還是在pig交互模式下的輸出,都遵循這樣的約定,請牢記這一點,因為不理解的話就會對數據結構的掌握產生偏差。
    • 一個元組(tuple)是若干字段(field)的一個有序集(ordered set)。在pig中表示數據時,用小括號()括起來的東西表示一個元組。
    • 一個字段是一塊數據(data)。

注意:強制轉化時,只轉化能匹配轉化的數據,不能匹配轉化的保留。比如數據格式如下:

false,1

false,2

false,3

false,q

false,c

false,4

A = load 'input.txt' USING PigStorage(',') AS (a:chararray, b:chararray);

B = FILTER A BY (int)b IS NULL;

DUMP B;

##結果
false,q
false,c

 

https://www.cnblogs.com/lishouguang/p/4559279.html

2、注釋:

    單行:--  

    多行:/* */

3、輸入和輸出:

    加載:load '文件'

        using PigStorage(',');

        as (exchange:int,symbol:long,date:int,dividends:chararry);

         //使用內置函數PigStorage函數,指定分隔符為',';還有一個加載函數是TextLoader。

        //采用as指定加載數據的模型。

 

    存儲:store 變量 into '輸出文件' using PigStorage(','); # 按照逗號,分割符來存儲。

按照Map格式輸出到文件:https://stackoverflow.com/questions/22416961/how-can-i-using-pig-scripts-to-generate-nested-avro-field

    輸出:dump 變量;//打印 ,只有dump 或者descrip 描述的時候才會執行代碼,若之前有group 等等語句,只是把他們加入到邏輯計划中,pig開始執行的是dump 語句,此時邏輯計划被編譯成物理計划。

    查看數據關系:describe 變量;//用於查看變量字段關系。[直接快速將變量schema打印出來,比dump速度快]

 

注意:如果數據中格式與load定義的數據格式不合適,對應的特征值會置空。

 如:數據test.txt中格式:

1_2 , 1 , 2 , 3

1 , 2_1 , 3 , 4

pig加載數據:

A = load 'test.txt' using PigStorage(',') AS (a:int, b:int, c:int, d:int);
DUMP A;

##結果為
( ,1,2,3)
(1, ,3,4)

若加載數據為:

A = load 'test.txt' using PigStorage(',') AS (a:chararray, b:chararray, c:int, d:int);
DUMP A;

##結果為
(1_2,1, 2,3)
(1, 2_1,3,4)

 

數據格式:

https://m.656463.com/wenda/zbsyPigStoragejzdtsj_496?ivk_sa=1023345p

(1,[open#apache])
(2,[apache#hadoop])
A = LOAD 'data' USING PigStorage(' ') as (a:tuple(col1:int, M:map []));
DUMP A;
 
#would give you 
((1,[open#apache]))
((2,[apache#hadoop]))

 

數據格式:(

cat data;
(3,8,9) (4,5,6)
(1,4,7) (3,7,5)
(2,5,8) (9,5,8)

A = LOAD 'data' AS (t1:tuple(t1a:int, t1b:int,t1c:int),t2:tuple(t2a:int,t2b:int,t2c:int));

DUMP A;
((3,8,9),(4,5,6))
((1,4,7),(3,7,5))
((2,5,8),(9,5,8))

X = FOREACH A GENERATE t1.t1a,t2.$0;

DUMP X;
(3,4)
(1,3)
(2,9)

 

 

4、關系操作:

foreach、Filter、Group、Order、Distinct、Join、Limit

5foreach:

逐行掃描進行某種處理,接受一組表達式,然后將它們應用到每條記錄中。

  • 比如:加載完所有記錄,只保留user和id兩個字段。
A = load 'input' as (user:chararray , id:long , address:chararray, password: long, date: int, name: chararray);
B = foreach A generate user,id;
  • 可以用*代表全部字段或者..來指定字段區間。
A = load 'input' as (user:chararray , id:long , address:chararray, password: long, date: int, name: chararray);

B = foreach A generate ..password;  --[產生user , id , address, password字段]

C = foreach A generate address..date;  --[產生address, password,date字段]
  • 兩列操作:
A = load 'input' as (a:int , b:int , c:int);

B = foreach A generate b - c;

C = foreach A generate $1 - $2; -- [B 和C 是一樣的,$0表示第一個字段,以此類推]
  • foreach 語句中的UDF(自定義函數),如:A = load 'input' as (a,b); B = foreach data generate UPPER(a) as  a, b;
  • 類型轉換:直接在字段前面加(類型),如:time字段原本的類型為chararray, (int)time即可。
  • +/-/*//加減乘除都可以用。
  • ?:像C++中的一樣用。2==2?1:4返回1.
  • tuple的映射是用.(點),如
A = load 'input' as (t:tuple(x:int,y:int));

B = foreach A generate b - c;
  • bag不能直接映射tuple,可映射tuple內的字段,生成新的bag。

如 A 為{group:int,data:{(a:int,b:int,c:int)}},--數據為 

 

B = foreach A generate data.a;  --B:{{(a:int)}} 數據為

({(1),(1)})

({(3),(3)})

({(5)})

C = foreach A generate data.(a,b); --C:{{(a:int,b:int)}}, 數據為

({(1,4),(1,2)})

({(3,1),(3,4)})

({(5,6)})

以下是錯誤的:

data = load 'input.txt' as (a:int, b: int, c: int); --data: {a:int,b:int,c:int} ,數據為
1,2,6)

(3,4,5)

(5,6,2)

(1,4,2)

(3,1,3)

group_data = group data by a;  -- 產生包含對於a給定的值對應的所有記錄的bag ,{group:int,data:{(a:int,b:int,c:int)}},數據為
1,{(1,4,2),(1,2,6)})

(3,{(3,1,3),(3,4,5)})

(5,{(5,6,2)})
D = foreach group_data generate SUM(data.b+ data.c); 

應該修改成:

data = load 'input.txt' as (a:int, b: int, c: int); 

A1 = foreach data generate a, b + c as bc;  -- A1:{a:int,bc:int},數據為

(1,8)

(3,9)

(5,8)

(1,6)

(3,4)

B1 = group A1 by a; --B1:{group:int,A1:{(a:int,bc:int)}} ,數據為 
1,{(1,6),(1,8)})

(3,{(3,4),(3,9)})

(5,{(5,8)})
C = foreach B1 generate SUM(A1.bc); -- C:{long},數據為
14)

(13)

(8

 

    Order by:

    Distinct:

    Join:

    Limit:

 

6、filter:

不可以在generate中使用。

  • 過濾出字段name為非空值的行。如:divs = filter data by name is not null;
  • 匹配正則的行,獲取字段name中不是BOB.*這種形式的。如:divs = filter data by not name matches 'BOB.*';  [ and / or / not布爾操作符 , and若執行第一個邏輯為false,后面就不執行了 ]

7、Group by :

可以將具有相同鍵值的數據聚合在一起。https://www.cnblogs.com/lishouguang/p/4559593.html

group by語句的輸出結果包含兩個字段,一個是鍵,另一個是包含了聚集的記錄的bag。存放鍵的字段別名為group。而bag的別名和被分組的那條語句的別名。

注意:這里group的key字段類型不能是bag。

  • 如:將兩個key組合group。
data = load 'input.txt' as (a:int, b: int, c: int); -- data:{a:int, b:int, c:int} ,數據為
1,2,6)

(3,4,5)

(5,6,2)

(1,4,2)

(3,1,3)
two_key_group = group data by (a,b);
,數據為

 

若要獲取group.a和group.b可以通過flatten(group)來得到,即

flatten_data = FOREACH two_key_group generate flatten(group), data.a,data.b;
,數據為

 

  • 如:group all,  對用戶的數據流中所有字段進行分組,原本bag數據值沒變化,只是順序打亂了。
all_group = group data all;  
,數據為
  • 如:log = FOREACH (GROUP log ALL) GENERATE FLATTEN(log);  [在將order_log存入文件之前執行這句話的作用是希望將order_log只存到一個文件中,而不是多個文件中,因為pig存文件時會將一個變量拆分成多個文件來存]

8、order by:

默認升序,降序采用desc。

-- data:

(1,2,6)
(3,4,5)
(5,6,2)
(1,4,2)
(3,1,3)

order_data = order data by a desc, b; 

(5,6,2)
(3,1,3)
(3,4,5)
(1,2,6)
(1,4,2)

9、distinct, 去重。

data = load 'input'
uniq = distinct data; 

10、join

將兩個表連接起來,其中采用::來獲取某個表的某個字段,如表A的a字段和表B的a字段分別為,A::a和B::a。

https://www.cnblogs.com/lishouguang/p/4559602.html

jnd = join a by f1, b by f2;

 
join操作默認的是內連接,只有兩邊都匹配才會保留
 
需要用null補位的那邊需要知道它的模式:
如果是左外連接,需要知道右邊的數據集的模式,不匹配的字段用null補位
如果是右外連接,需要知道左邊的數據集的模式,不匹配的字段用null補位
如果是全外連接,需要知道兩邊的數據集的模式,不匹配的字段用null補位
 
觸發reduce階段
基本用法:
a = load 'input1';
b = load 'input2';
jnd = join a by $0, b by $1;

多字段連接:

a = load 'input1' as (username, age, city);
b = load 'input2' as (orderid, user, city);
jnd = join a by (username, city), b by (user, city);

:: join后的字段引用

a = load 'input1' as (username, age, address);
b = load 'input2' as (orderid, user, money;
jnd = join a by username, b by user;
result = foreach jnd generate a::username, a::age, address, b::orderid;

多數據集連接

a = load 'input1' as (username, age);
b = load 'input2' as (orderid, user);
c = load 'input3' as (user, acount);
jnd = join a by username, b by user, c by user;

外連接 僅限兩個數據集

a = load 'input1' as (username, age);
b = load 'input2' as (orderid, user);
jnd = join a by username left outer, b by user;
jnd = join a by username right, b by user;
jnd = join a by username full, b by user;

自連接 需要加載自身數據集兩次,使用不同的別名

a = load 'data' as (node, parentid, name);
b = load 'data' as (node, parentid, name);
jnd = join a by node, b by parentid;

 

https://www.aboutyun.com/thread-14881-1-1.html

1) Replicated Join
  當進行Join的一個表比較大,而其他的表都很小(能夠放入內存)時,Replicated Join會非常高效。
  Replicated Join會把所有的小表放置在內存當中,然后在Map中讀取大表中的數據記錄,和內存中存儲的小表的數據進行Join,得到Join結果,無需Reduce。
  可以在Join時使用 Using 'replicated'語句來觸發Replicated Join,大表放置在最左端,其余小表(可以有多個)放置在右端。

  2) Skewed Join
當進行Join的兩個表中,一個表數據記錄針對key的分布極其不均衡的時候,簡單的使用Hash來分配Reduce端的key時,可能導致某些Reducer上的數據量特別大,降低整個集群的性能。
  Skewed Join可以首先對左邊的表的key統計其分布,然后決定Reduce端的key的分布,盡量使得Reduce端的數據分布比較均衡
  可以在Join時使用Using 'skewed'語句來觸發Skewed Join,需要進行統計的表(亦即key可能分布不均衡的表)放置在左端。

  3) Merge Join
當進行Join的兩個表都已經是有序的時,可以使用Merge Join。
  Join時,首先對右端的表進行一次采樣,對采樣的數據創建索引,記錄(key, 文件名, 偏移[offset])。然后進行map,讀取Join左邊的表,對於每一條數據記錄,根據前一步計算好的索引來查找數據,進行Join。
  可以在Join時使用Using 'merge'語句來觸發Merge Join,需要創建索引的表放置在右端。
  另外,在進行Join之前,首先過濾掉key為Null的數據記錄可以減少Join的數據量。

 

11、Limit:

只取幾條數據查看

data = load 'inut';
first10 = limit data 10;

12、Sample:

用於抽樣樣本數據,會讀取所有的數據然后返回一定百分比的行數的數據。

data = load 'inut';
sample_data = sample data 0.1;

13、Parallel:

附加到任一個關系操作符后面,控制reduce 階段的並行。

data = load 'inut' as (a,b,c);
bya =group data by a parallel 10;  -- 觸發Mapreduce任務具有10個reducer。

14、flatten:

降低bag或tuple嵌套級別。

15、自定義函數UDF:

    注冊非pig內置的UDF:REGISTER '…….jar‘;

    define命令和UDF:define命令可用於為用戶的Java UDF定義一個別名,這樣用戶就不需要寫那么冗長的包名全路徑了,它也可以為用戶的UDF的構造函數提供參數。

    set:在pig腳本前面加上set ***; 這個命令可在Pig腳本的開頭來設置job的參數;

三、例子:

1、group使用

data.txt文件內容:

1,2,6

3,4,5

5,6,2

1,4,2

3,1,3

>> data = load 'data.txt' using PigStorage(',') as (a:int,b:int,c:int);

>> describe data;  [describe速度快,不需要去執行代碼]

data: {a: int,b: int,c: int}

 

 

 

 

1、使用三目運算符來替換空值

B = FOREACH A GENERATE ((col1 is null) ? -1 :col1)

-- 替換bag空值, 其中col1為bag,類型為{(int),(int)}
C = FOREACH A GENERATE ((col1 is null or IsEmpty(col1)) ? {(0)} :col1;

2、外連接JOIN:

LEFT:左邊的數據全量顯示

A = LOAD '1.txt' USING PigStorage('\t') AS (col1:int , col2:chararray);
B = LOAD '2.txt' USING PigStorage('\t') AS ( col1:int , col2:chararray);
C = JOIN A BY col1 LEFT , B BY col1;
DESCRIBE C;
DUMP C;

 

 3、合並文件A和B的數據:

A = LOAD 'A.txt';
B = LOAD 'B.txt';
C = UNION A,B;
DUMP C;

 

4、表示文件的第一個字段(第一列):$0;

5、pig統計文件的詞頻:TOKENIZE

-- 統計數據的行數

cd hdfs:///

A = LOAD '/logdata/2012*/*/nohup_*' AS (name:chararray) ;

B = GROUP A BY name;

C = FOREACH B GENERATE group, COUNT(A);

D = ORDER C BY ($1);

E = FILTER D BY $1 > 200;

dump E;

-- 統計單詞的個數

A = LOAD'/logdata/20130131/*/*' AS (line: chararray) ;

B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;

C = group B by word;

D = foreach C generate COUNT(B), group;

E = ORDER D BY ($0);

F = FILTER E BY $0> 200;

DUMP F;

TOKENIZE函數:https://www.w3cschool.cn/apache_pig/apache_pig_tokenize.html

flatten函數:https://blog.csdn.net/iteye_20817/java/article/details/82545911

flatten在英文的意思弄平整的意思,這個操作符在不同的場景有不同的功能。 

1. flatten tuple
flatten會把tuple內容打開,下面舉例:

-- A結構:(a, (b, c))
B = foreach A GENERATE $0, flatten($1)

B返回結果(a,b,c)
 

2. flatten bag
flatten會把bag內容打開,每個tuple是一行,即列轉換為行

-- A結構:({(b,c),(d,e)})
B = foreach A generate flatten($0)

B返回結果
(b,c)
(d,e)

3、若將一個bag完全拆包:flatten(BagToTuple(data))

數據group by之后的格式:g_data: {group: int,data: {(a: int,b: int,c: int)}}

(1,{(1,,),(1,4,2),(1,2,6)})
(3,{(3,1,9),(3,1,3),(3,4,5)})
(5,{(5,6,2)})

執行命令:C= foreach g_data generate BagToTuple(data);

格式:C: {org.apache.pig.builtin.bagtotuple_data_53: (a: int,b: int,c: int)}

((1,,,1,4,2,1,2,6))
((3,1,9,3,1,3,3,4,5))
((5,6,2))

 

執行命令:B = foreach g_data generate FLATTEN(BagToTuple(data));

格式:B: {org.apache.pig.builtin.bagtotuple_data_27::a: int,org.apache.pig.builtin.bagtotuple_data_27::b: int,org.apache.pig.builtin.bagtotuple_data_27::c: int}

(1,,,1,4,2,1,2,6)
(3,1,9,3,1,3,3,4,5)
(5,6,2)

4、例子:

http://www.voidcn.com/article/p-pmbsmbzj-btk.html

 

舉例子:

1.txt;

    i am hadoop  
    i am hadoop  
    i am lucene  
    i am hbase  
    i am hive  
    i am hive sql  
    i am pig 

pig代碼:

--load文本的txt數據,並把每行作為一個文本  
a = load '1.txt' as (f1:chararray);  
--將每行數據,按指定的分隔符(這里使用的是空格)進行分割,並轉為扁平結構  
b = foreach a generate flatten(TOKENIZE(f1, ' '));  
--對單詞分組  
c = group b by $0;  
--統計每個單詞出現的次數  
d = foreach c generate group ,COUNT($1);  
--存儲結果數據  
stroe d into '$out'  

##注意,COUNT函數一定要大寫,不然會報錯: ERROR org.apache.pig.PigServer- exception during parsing:Error during parsing. Could not resolve count using imports:[, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]

處理的結果:

    (i,7)  
    (am,7)  
    (pig,1)  
    (sql,1)  
    (hive,2)  
    (hbase,1)  
    (hadoop,2)  
    (lucene,1)  

取topN功能:

-- 按統計次數降序  
e = order d by $1 desc;  
--取top2  
f = limit e 2;  
--存儲結果數據  
stroe f into '$out'  

 6、pig嵌套循環

 https://blog.csdn.net/jameshadoop/article/details/24838915

 7、pig傳參

A = LOAD '$INPUT_DIR' AS (t0:long, msisdn:chararray, t2:chararray, t3:chararray, t4:chararray,t5:chararray, t6:long, t7:long, t8:long, t9:long, t10:chararray);
B = FOREACH A GENERATE msisdn, t6, t7, t8, t9;
C = GROUP B BY msisdn;
D = FOREACH C GENERATE  group, SUM(B.t6), SUM(B.t7), SUM(B.t8), SUM(B.t9);
STORE D INTO '$OUTPUT_DIR';


pig -p INPUT_DIR=hdfs://mycluster/pig/in -p OUTPUT_DIR=hdfs://mycluster/pig/out  ./schedule.pig
--------------------- 
原文:https://blog.csdn.net/aaronhadoop/article/details/44310633  


PIG 命令行傳多個參數

PIG 命令行執行腳本,多個參數傳遞問題終於解決了,實例如下:

pig -p startdate=2011-03-21 -p enddate=2011-03-28 script.pig

這樣就可以實現多個參數傳遞的例子,但其中,如果參數值中存在空格,則會報錯,
原文:https://blog.csdn.net/iteye_19679/article/details/82580903  

8、兩列相除:

# 兩個整數相除,如何得到一個float
A = LOAD '16.txt' AS (col1:int, col2:int);
B = FOREACH A GENERATE (float)col1/col2;
DUMP B;
# 注意先轉型在計算,而不是(float)(col1/col2);

 

9、filter正則匹配:

 https://www.cnblogs.com/lishouguang/p/4559300.html

 
1)等值比較
filter data by $0 == 1
filter data by $0 != 1
 
2)字符串 正則匹配  JAVA的正則表達式
字符串以CM開頭
filter data by $0 matches 'CM.*';

字符串包含CM
filter data by $0 matches '.*CM.*';

 
3)not
filter data by not $0==1;
filter data by not $0 matches '.*CM.*';

   
4)NULL處理
filter data by $0 is not null;

   
5)UDF
filter data by isValidate($0);

   
6)and or
filter data by $0!=1 and $1>10

 

10、修改Pig作業執行的queue

作業提交到的隊列:mapreduce.job.queuename

作業優先級:mapreduce.job.priority,優先級默認有5個:LOW VERY_LOW NORMAL(默認) HIGH VERY_HIGH
1、靜態設置
1.1 Pig版本

SET mapreduce.job.queuename root.etl.distcp;

SET mapreduce.job.priority HIGH;
---------------------  
作者:wisgood  來源:CSDN  
原文:https://blog.csdn.net/wisgood/article/details/39075883  

 https://my.oschina.net/crxy/blog/420227?p=1

基本用法
1
2
3
a = load  'input1' ;
b = load  'input2' ;
jnd = join a by  $0 , b by  $1 ;

   

多字段連接
1
2
3
a = load  'input1'  as (username, age, city);
b = load  'input2'  as (orderid, user, city);
jnd = join a by (username, city), b by (user, city);

   

:: join后的字段引用
1
2
3
4
a = load  'input1'  as (username, age, address);
b = load  'input2'  as (orderid, user, money;
jnd = join a by username, b by user;
result = foreach jnd generate a: :username , a: :age , address, b: :orderid ;

   

多數據集連接
1
2
3
4
a = load  'input1'  as (username, age);
b = load  'input2'  as (orderid, user);
c = load  'input3'  as (user, acount);
jnd = join a by username, b by user, c by user;

   

外連接 僅限兩個數據集
1
2
3
4
5
a = load  'input1'  as (username, age);
b = load  'input2'  as (orderid, user);
jnd = join a by username left outer, b by user;
jnd = join a by username right, b by user;
jnd = join a by username full, b by user;

  

自連接 需要加載自身數據集兩次,使用不同的別名
1
2
3
a = load  'data'  as (node, parentid, name);
b = load  'data'  as (node, parentid, name);
jnd = join a by node, b by parentid;

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM