本文來自與作者閱讀 Programming Pig 所做的筆記,轉載請注明出處 http://www.cnblogs.com/siwei1988/archive/2012/08/06/2624912.html
。Pig Latin是一種數據流語言,變量的命名規則同java中變量的命名規則,變量名可以復用(不建議這樣做,這種情況下相當與新建一個變量,同時刪除原來的變量)
A = load 'NYSE_dividends' (exchange, symbol, date, dividends); A = filter A by dividends > 0; A = foreach A generate UPPER(symbol);
。注釋:--單行注釋;/*……*/多行注釋;
。Pig Latin關鍵詞不區分大小寫,比如load,foreach,但是變量名和udf區分大小寫,COUNT是udf,所以不同於count。
。Load 加載數據
默認加載當前用戶的home目錄(/users/yourlogin
),可以在grunt下輸入cd 命令更改當前所在目錄。
divs = load '/data/examples/NYSE_dividends'
也可以輸入完整的文件名
divs = load ‘hdfs://nn.acme.com/data/examples/NYSE_dividends’
默認使用TAB(\t)作為分割符,也可以使用using定義其它的分割符
divs = load 'NYSE_dividends' using PigStorage(',');
注意:只能用一個字符作為分割符
還可以使用using定義其它的加載函數
divs = load 'NYSE_dividends' using HBaseStorage();
as用於定義模式
divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
也可以使用通配符加載一個目錄下的所有文件,該目錄下的所有子目錄的文件也會被加載。通配符由hadoop文件系統決定,下面是hadoop 0.20所支持的通配符
glob | comment |
---|---|
? | Matches any single character. |
* | Matches zero or more characters. |
[abc] | Matches a single character from character set (a,b,c). |
[a-z] | Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character. |
[^abc] | Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket. |
[^a-z] | Matches a single character that is not from the character range (a..z) inclusive. The ^ character must occur immediately to the right of the opening bracket. |
\c | Removes (escapes) any special meaning of character c. |
{ab,cd} | Matches a string from the string set {ab, cd} |
。as 定義模式,可用於load ** [as (ColumnName[:type])],foreach…generate ColumnName [as newColumnName]
。store存儲數據,默認用using PigStorage 使用tab作為分割符。
store processed into '/data/examples/processed';
也可以輸入完整路徑比如hdfs://nn.acme.com/data/examples/processed
.
可以使用using調用其它存儲函數或其它分割符
store processed into 'processed' using HBaseStorage();
store processed into 'processed' using PigStorage(',');
注意:數據存儲並不是存儲為一個文件,而是由reduce進程數決定的多個part文件。
。foreach…generate[*][begin .. end]
*匹配所有,同樣適用與udf;
..匹配begin和end之間的部分,包括begin和end
prices = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); beginning = foreach prices generate ..open; -- produces exchange, symbol, date, open middle = foreach prices generate open..close; -- produces open, high, low, close end = foreach prices generate volume..; -- produces volume, adj_close
一般情況下foreach…generate…重新生成的模式中的數據名和數據類型保持原來的名字和數據類型,但是如果有表達式則不會,可以在generate 變量后使用as關鍵詞定義別名;
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); sym = foreach divs generate symbol; describe sym; sym: {symbol: chararray}
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0; describe in_cents; in_cents: {dividend: double,double}
#用於map查找;.用於tuple(元組)投影;
bball = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); avg = foreach bball generate bat#'batting_average';
A = load 'input' as (t:tuple(x:int, y:int)); B = foreach A generate t.x, t.$1;
3.獲取bag(包)中的數據
A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.x;
A = load 'input' as (b:bag{t:(x:int, y:int)}); B = foreach A generate b.(x, y);
下面的語句將執行不了
A = load 'foo' as (x:chararray, y:int, z:int); B = group A by x; -- produces bag A containing all the records for a given value of x C = foreach B generate SUM(A.y + A.z);
因為A.y 和 A.z都是bag,符號+對於bag不適用。
正確的做法如下
A = load 'foo' as (x:chararray, y:int, z:int); A1 = foreach A generate x, y + z as yz; B = group A1 by x; C = foreach B generate SUM(A1.yz);
。foreach中嵌套其它語句
--distinct_symbols.pig daily = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields grpd = group daily by exchange; uniqcnt = foreach grpd { sym = daily.symbol; uniq_sym = distinct sym; generate group, COUNT(uniq_sym); };
注意:foreach內部只支持distinct
, filter
, limit
, order關鍵詞;最后一句必須是generate;
--double_distinct.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray); grpd = group divs all; uniq = foreach grpd { exchanges = divs.exchange; uniq_exchanges = distinct exchanges; symbols = divs.symbol; uniq_symbols = distinct symbols; generate COUNT(uniq_exchanges), COUNT(uniq_symbols); };
。flatten消除包嵌套關系
--flatten.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); pos = foreach players generate name, flatten(position) as position; bypos = group pos by position;
--flatten_noempty.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); noempty = foreach players generate name, ((position is null or IsEmpty(position)) ? {('unknown')} : position) as position; pos = foreach noempty generate name, flatten(position) as position; bypos = group pos by position;
。filter (注:pig中的邏輯語句同樣遵循短路原則)
注意:null == 任何數據
。filter結合matches使用正則表達式(matches前加not表示不匹配)
pig中的正則表達式格式和java中的正則表達所一樣,參考 http://docs.oracle.com/javase/6/docs/api/java/util/regex/Pattern.html
各種轉義字符,轉義字符使用方式:\\后面跟上轉義碼
點的轉義:. ==> u002E 美元符號的轉義:$ ==> u0024 乘方符號的轉義:^ ==> u005E 左大括號的轉義:{ ==> u007B 左方括號的轉義:[ ==> u005B 左圓括號的轉義:( ==> u0028 豎線的轉義:| ==> u007C 右圓括號的轉義:) ==> u0029 星號的轉義:* ==> u002A 加號的轉義:+ ==> u002B 問號的轉義:? ==> u003F 反斜杠的轉義: ==> u005C
下面的例子查找包括CM.的記錄
-- filter_not_matches.pig divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); notstartswithcm = filter divs by not symbol matches '.*CM\\2u002E1.*';
。group之后的數據是一個map,其中key是group所用的鍵值,value是group針對的變量;
可用()同時對多個變量作group,group…all用於所有變量(注意:使用all時沒有by),group之后的變量分為兩個部分,第一部分變量名是group(不能更改),第二部是和原始bag模式一樣的bag。
--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd = group daily by (exchange, stock);
avg = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
stock: bytearray,date: bytearray,dividends: bytearray}}
--countall.pig daily = load 'NYSE_daily' as (exchange, stock); grpd = group daily all; cnt = foreach grpd generate COUNT(daily);
。cogroup對多個變量進行group
注意:所有key值為null的數據都被歸為同一類,這一點和group相同,和join不同。
A = load 'input1' as (id:int, val:float); B = load 'input2' as (id:int, val2:int); C = cogroup A by id, B by id; describe C; C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}
。order by
對單列進行排序
--order.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydate = order daily by date;
對多列進行排序
--order2key.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); bydatensymbol = order daily by date, symbol;
desc關鍵詞按降序進行排序,null小於所有詞
--orderdesc.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); byclose = order daily by close desc, open; dump byclose; -- open still sorted in ascending order
。distinct只能去掉整個元組的重復行,不能去掉某幾個特定列的重復行
--distinct.pig -- find a distinct list of ticker symbols for each exchange -- This load will truncate the records, picking up just the first two fields. daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray); uniq = distinct daily;
。join/left join / right join
null不匹配任何數據
-- join2key.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date), divs by (symbol, date);
--leftjoin.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends); jnd = join daily by (symbol, date) left outer, divs by (symbol, date);
也可以同時多個變量,但只用於inner join
A = load 'input1' as (x, y); B = load 'input2' as (u, v); C = load 'input3' as (e, f); alpha = join A by x, B by u, C by e;
也可以自身和自身join,但數據要加載兩次
--selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); divs2 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs2 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
下面這樣不行
--selfjoin.pig -- For each stock, find all dividends that increased between two dates divs1 = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends); jnd = join divs1 by symbol, divs1 by symbol; increased = filter jnd by divs1::date < divs2::date and divs1::dividends < divs2::dividends;
。union 相當與sql中的union,但與sql不通的是pig中的union可以針對兩個不同模式的變量:如果兩個變量模式相同,那么union后的變量模式與變量的模式一樣;如果一個變量的模式可以由另一各變量的模式強制類型轉換,那么union后的變量模式與轉換后的變量模式相同;否則,union后的變量沒有模式。
A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:float); C = union A, B; describe C; C: {x: int,y: float} A = load 'input1' as (x:double, y:float); B = load 'input2' as (x:int, y:double); C = union A, B; describe C; C: {x: double,y: double} A = load 'input1' as (x:int, y:float); B = load 'input2' as (x:int, y:chararray); C = union A, B; describe C; Schema for C unknown.
注意:在pig 1.0中 執行不了最后一種union。
注意:union不會剔除重復的行
如果需要對兩個具有不通列名的變量union的話,可以使用onschema關鍵字
A = load 'input1' as (w: chararray, x:int, y:float); B = load 'input2' as (x:int, y:double, z:chararray); C = union onschema A, B; describe C; C: {w: chararray,x: int,y: double,z: chararray}
。cross 相當於離散數學中的叉乘,輸入行數分別為m行,n行,輸出行數則為m*n行。
--thetajoin.pig --I recommand running this one on a cluster too daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); crossed = cross daily, divs; tjnd = filter crossed by daily::date < divs::date;
。limit
--limit.pig divs = load 'NYSE_dividends'; first10 = limit divs 10;
在pig中除了order by 之外生成的數據都沒有固定的順序。上面的程序每次生成的數據也是不一樣的。
。sample 用於生成測試數據,按指定參數選取部分數據。下面的程序選取10%的數據。
--sample.pig divs = load 'NYSE_dividends'; some = sample divs 0.1;
。Parallel 設置pig的reduce進程個數
--parallel.pig daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol parallel 10;
parallel只針對一條語句,如果希望腳本中的所有語句都有10個reduce進程,可以使用 set default_parallel 10命令
--defaultparallel.pig set default_parallel 10; daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); bysymbl = group daily by symbol; average = foreach bysymbl generate group, AVG(daily.close) as avg; sorted = order average by avg desc;
如果同時使用parallel和set default_parallel,那么parallel中的參數將覆蓋set default_parallel
。UDF
注冊udf
--register.pig register 'your_path_to_piggybank/piggybank.jar'; divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate org.apache.pig.piggybank.evaluation.string.Reverse(symbol);
定義udf別名
--define.pig register 'your_path_to_piggybank/piggybank.jar'; define reverse org.apache.pig.piggybank.evaluation.string.Reverse(); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate reverse(symbol);
構造函數帶參數的udf
--define_constructor_args.pig register 'acme.jar'; define convert com.acme.financial.CurrencyConverter('dollar', 'euro'); divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); backwards = foreach divs generate convert(dividends);
。托管java中的靜態函數(效率較低)
--invoker.pig define hex InvokeForString('java.lang.Integer.toHexString', 'int'); divs = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close, volume, adj_close); nonnull = filter divs by volume is not null; inhex = foreach nonnull generate symbol, hex((int)volume);
如果函數的參數是一個數組,那么傳遞過去的是一個bag
define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]'); A = load 'input' as (id: int, dp:double); B = group A by id; C = foreach B generate group, stdev(A.dp);
。multiquery
--multiquery.pig players = load 'baseball' as (name:chararray, team:chararray, position:bag{t:(p:chararray)}, bat:map[]); pwithba = foreach players generate name, team, position, bat#'batting_average' as batavg; byteam = group pwithba by team; avgbyteam = foreach byteam generate group, AVG(pwithba.batavg); store avgbyteam into 'by_team'; flattenpos = foreach pwithba generate name, team, flatten(position) as position, batavg; bypos = group flattenpos by position; avgbypos = foreach bypos generate group, AVG(flattenpos.batavg); store avgbypos into 'by_position';
。split
wlogs = load 'weblogs' as (pageid, url, timestamp); split wlogs into apr03 if timestamp < '20110404', apr02 if timestamp < '20110403' and timestamp > '20110401', apr01 if timestamp < '20110402' and timestamp > '20110331'; store apr03 into '20110403'; store apr02 into '20110402'; store apr01 into '20110401';
。設置pig環境
Parameter | Value Type | Description |
---|---|---|
debug | string | Sets the logging level to DEBUG . Equivalent to passing -debug DEBUG on the command line. |
default_parallel | integer | Sets a default parallel level for all reduce operations in the script. See the section called “Parallel” for details. |
job.name | string | Assigns a name to the Hadoop job. By default the name is the filename of the script being run, or a randomly generated name for interactive sessions. |
job.priority | string Type | If your Hadoop cluster is using the Capacity Scheduler with priorities enabled for queues, this allows you to set the priority of your Pig job. Allowed values are very_low , low , normal , high , very_high . |
。parameter 向pig腳本傳遞參數
--daily.pig daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); yesterday = filter daily by date == '$DATE'; grpd = group yesterday all; minmax = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);
用-p 傳遞參數,每個變量前都要加一個-p
pig -p DATE=2009-12-17 daily.pig
參數也可以放在一個文件里,每行一個參數,注釋部分以#開頭,使用-m
或者 -param_file
.調用參數文件
pig腳本
wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp);
參數文件
#Param file YEAR=2009- MONTH=12- DAY=17 DATE=$YEAR$MONTH$DAY
執行
pig -param_file daily.params daily.pig
也可以在pig內定義參數%declare 或者 %default,%default定義默認的參數,在特殊情況下可以被覆蓋
注意:%declare和%default不能用於以下位置:
- pig腳本,此腳本非Macro宏,並且腳本被另外一個腳本調用(如果不被調用可以使用)
%default parallel_factor 10; wlogs = load 'clicks' as (url, pageid, timestamp); grp = group wlogs by pageid parallel $parallel_factor; cntd = foreach grp generate group, COUNT(wlogs);
。定義Macro宏,相當於子函數
--macro.pig -- Given daily input and a particular year, analyze how -- stock prices changed on days dividends were paid out. define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close) returns analyzed { divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray, date:chararray, dividends:float); divsthisyear = filter divs by date matches '$year-.*'; dailythisyear = filter $daily by date matches '$year-.*'; jnd = join divsthisyear by symbol, dailythisyear by $daily_symbol; $analyzed = foreach jnd generate dailythisyear::$daily_symbol, $daily_close - $daily_open; }; daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
。引用pig文件,被引用的文件被執行一遍,相當於拼接在一起,被引用的文件中不能存在自定義變量
--main.pig import '../examples/ch6/dividend_analysis.pig'; daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray, date:chararray, open:float, high:float, low:float, close:float, volume:int, adj_close:float); results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');
默認搜索文件夾為當前文件夾,可以使用set pig.import.search.path設置搜索的路徑
set pig.import.search.path '/usr/local/pig,/grid/pig'; import 'acme/macros.pig';