【原創】Hadoop pig進階語法


本文來自與作者閱讀 Programming Pig 所做的筆記,轉載請注明出處 http://www.cnblogs.com/siwei1988/archive/2012/08/06/2624912.html

。Pig Latin是一種數據流語言,變量的命名規則同java中變量的命名規則,變量名可以復用(不建議這樣做,這種情況下相當與新建一個變量,同時刪除原來的變量)

A = load 'NYSE_dividends' (exchange, symbol, date, dividends);
A = filter A by dividends > 0;
A = foreach A generate UPPER(symbol);

。注釋:--單行注釋;/*……*/多行注釋;

。Pig Latin關鍵詞不區分大小寫,比如load,foreach,但是變量名和udf區分大小寫,COUNT是udf,所以不同於count。

。Load 加載數據

默認加載當前用戶的home目錄(/users/yourlogin),可以在grunt下輸入cd 命令更改當前所在目錄。

divs = load '/data/examples/NYSE_dividends'

也可以輸入完整的文件名

divs = load ‘hdfs://nn.acme.com/data/examples/NYSE_dividends’

默認使用TAB(\t)作為分割符,也可以使用using定義其它的分割符

divs = load 'NYSE_dividends' using PigStorage(',');

注意:只能用一個字符作為分割符

還可以使用using定義其它的加載函數

divs = load 'NYSE_dividends' using HBaseStorage();

as用於定義模式

divs = load 'NYSE_dividends' as (exchange, symbol, date, dividends);

也可以使用通配符加載一個目錄下的所有文件,該目錄下的所有子目錄的文件也會被加載。通配符由hadoop文件系統決定,下面是hadoop 0.20所支持的通配符

glob comment
? Matches any single character.
* Matches zero or more characters.
[abc] Matches a single character from character set (a,b,c).
[a-z] Matches a single character from the character range (a..z), inclusive. The first character must be lexicographically less than or equal to the second character.
[^abc] Matches a single character that is not in the character set (a, b, c). The ^ character must occur immediately to the right of the opening bracket.
[^a-z] Matches a single character that is not from the character range (a..z) inclusive. The ^ character must occur immediately to the right of the opening bracket.
\c Removes (escapes) any special meaning of character c.
{ab,cd} Matches a string from the string set {ab, cd}

。as 定義模式,可用於load ** [as (ColumnName[:type])],foreach…generate ColumnName [as newColumnName]

。store存儲數據,默認用using PigStorage 使用tab作為分割符。

store processed into '/data/examples/processed';

也可以輸入完整路徑比如hdfs://nn.acme.com/data/examples/processed.

可以使用using調用其它存儲函數或其它分割符

store processed into 'processed' using
      HBaseStorage();
store processed into 'processed' using PigStorage(',');

注意:數據存儲並不是存儲為一個文件,而是由reduce進程數決定的多個part文件。

。foreach…generate[*][begin .. end]

*匹配所有,同樣適用與udf;

..匹配begin和end之間的部分,包括begin和end

prices = load 'NYSE_daily' as (exchange, symbol, date, open,
high, low, close, volume, adj_close);
beginning = foreach prices generate ..open; -- produces exchange, symbol, date, open
middle    = foreach prices generate open..close; -- produces open, high, low, close
end       = foreach prices generate volume..; -- produces volume, adj_close

一般情況下foreach…generate…重新生成的模式中的數據名和數據類型保持原來的名字和數據類型,但是如果有表達式則不會,可以在generate 變量后使用as關鍵詞定義別名;

divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
          date:chararray, dividends:float);
sym  = foreach divs generate symbol;
describe sym;

sym: {symbol: chararray}
divs     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
in_cents = foreach divs generate dividends * 100.0 as dividend, dividends * 100.0; 
describe in_cents;

in_cents: {dividend: double,double}

 #用於map查找;.用於tuple(元組)投影;

bball = load 'baseball' as (name:chararray, team:chararray,
          position:bag{t:(p:chararray)}, bat:map[]);
avg = foreach bball generate bat#'batting_average';
A = load 'input' as (t:tuple(x:int, y:int));
B = foreach A generate t.x, t.$1;

3.獲取bag(包)中的數據

A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.x;
A = load 'input' as (b:bag{t:(x:int, y:int)});
B = foreach A generate b.(x, y);

下面的語句將執行不了

A = load 'foo' as (x:chararray, y:int, z:int);
B = group A by x; -- produces bag A containing all the records for a given value of x
C = foreach B generate SUM(A.y + A.z);

因為A.y 和 A.z都是bag,符號+對於bag不適用。

正確的做法如下

A = load 'foo' as (x:chararray, y:int, z:int);
A1 = foreach A generate x, y + z as yz;
B = group A1 by x;
C = foreach B generate SUM(A1.yz);

。foreach中嵌套其它語句

--distinct_symbols.pig
daily    = load 'NYSE_daily' as (exchange, symbol); -- not interested in other fields
grpd     = group daily by exchange;
uniqcnt  = foreach grpd {
           sym      = daily.symbol;
           uniq_sym = distinct sym;
           generate group, COUNT(uniq_sym);
};

注意:foreach內部只支持distinctfilterlimit, order關鍵詞;最后一句必須是generate;

--double_distinct.pig
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray);
grpd = group divs all;
uniq = foreach grpd {
            exchanges      = divs.exchange;
            uniq_exchanges = distinct exchanges;
            symbols        = divs.symbol;
            uniq_symbols   = distinct symbols;
            generate COUNT(uniq_exchanges), COUNT(uniq_symbols);
};

。flatten消除包嵌套關系

--flatten.pig
players = load 'baseball' as (name:chararray, team:chararray,
            position:bag{t:(p:chararray)}, bat:map[]);
pos     = foreach players generate name, flatten(position) as position;
bypos   = group pos by position;

 

--flatten_noempty.pig
players = load 'baseball' as (name:chararray, team:chararray,
            position:bag{t:(p:chararray)}, bat:map[]);
noempty = foreach players generate name,
            ((position is null or IsEmpty(position)) ? {('unknown')} : position)
            as position;
pos     = foreach noempty generate name, flatten(position) as position;
bypos   = group pos by position;

。filter (注:pig中的邏輯語句同樣遵循短路原則)

 注意:null == 任何數據

。filter結合matches使用正則表達式(matches前加not表示不匹配)

pig中的正則表達式格式和java中的正則表達所一樣,參考 http://docs.oracle.com/javase/6/docs/api/java/util/regex/Pattern.html

各種轉義字符,轉義字符使用方式:\\后面跟上轉義碼

點的轉義:. ==> u002E 
美元符號的轉義:$ ==> u0024 
乘方符號的轉義:^ ==> u005E 
左大括號的轉義:{ ==> u007B 
左方括號的轉義:[ ==> u005B 
左圓括號的轉義:( ==> u0028 
豎線的轉義:| ==> u007C 
右圓括號的轉義:) ==> u0029 
星號的轉義:* ==> u002A 
加號的轉義:+ ==> u002B 
問號的轉義:? ==> u003F 
反斜杠的轉義: ==> u005C 

下面的例子查找包括CM.的記錄

-- filter_not_matches.pig
divs           = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                    date:chararray, dividends:float);
notstartswithcm = filter divs by not symbol matches '.*CM\\2u002E1.*';

 

。group之后的數據是一個map,其中key是group所用的鍵值,value是group針對的變量;

可用()同時對多個變量作group,group…all用於所有變量(注意:使用all時沒有by),group之后的變量分為兩個部分,第一部分變量名是group(不能更改),第二部是和原始bag模式一樣的bag。

--twokey.pig
daily = load 'NYSE_daily' as (exchange, stock, date, dividends);
grpd  = group daily by (exchange, stock);
avg   = foreach grpd generate group, AVG(daily.dividends);
describe grpd;
grpd: {group: (exchange: bytearray,stock: bytearray),daily: {exchange: bytearray,
    stock: bytearray,date: bytearray,dividends: bytearray}}
--countall.pig
daily = load 'NYSE_daily' as (exchange, stock);
grpd  = group daily all;
cnt   = foreach grpd generate COUNT(daily);

。cogroup對多個變量進行group

注意:所有key值為null的數據都被歸為同一類,這一點和group相同,和join不同。

A = load 'input1' as (id:int, val:float);
B = load 'input2' as (id:int, val2:int);
C = cogroup A by id, B by id;
describe C;

C: {group: int,A: {id: int,val: float},B: {id: int,val2: int}}

 。order by

對單列進行排序

--order.pig
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
bydate  = order daily by date;

對多列進行排序

--order2key.pig
daily          = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                    date:chararray, open:float, high:float, low:float,
                    close:float, volume:int, adj_close:float);
bydatensymbol  = order daily by date, symbol;

desc關鍵詞按降序進行排序,null小於所有詞

--orderdesc.pig
daily    = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
byclose  = order daily by close desc, open;
dump byclose; -- open still sorted in ascending order

。distinct只能去掉整個元組的重復行,不能去掉某幾個特定列的重復行

--distinct.pig
-- find a distinct list of ticker symbols for each exchange
-- This load will truncate the records, picking up just the first two fields.
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray);
uniq    = distinct daily;

。join/left join / right join

null不匹配任何數據

-- join2key.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd   = join daily by (symbol, date), divs by (symbol, date);
--leftjoin.pig
daily = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
divs  = load 'NYSE_dividends' as (exchange, symbol, date, dividends);
jnd   = join daily by (symbol, date) left outer, divs by (symbol, date);

也可以同時多個變量,但只用於inner join

A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;

也可以自身和自身join,但數據要加載兩次

--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
divs2     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
jnd       = join divs1 by symbol, divs2 by symbol;
increased = filter jnd by divs1::date < divs2::date and
                divs1::dividends < divs2::dividends;

下面這樣不行

--selfjoin.pig
-- For each stock, find all dividends that increased between two dates
divs1     = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends);
jnd       = join divs1 by symbol, divs1 by symbol;
increased = filter jnd by divs1::date < divs2::date and
                divs1::dividends < divs2::dividends;

。union 相當與sql中的union,但與sql不通的是pig中的union可以針對兩個不同模式的變量:如果兩個變量模式相同,那么union后的變量模式與變量的模式一樣;如果一個變量的模式可以由另一各變量的模式強制類型轉換,那么union后的變量模式與轉換后的變量模式相同;否則,union后的變量沒有模式。

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;

C: {x: int,y: float}

A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;

C: {x: double,y: double}

A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;

Schema for C unknown.
注意:在pig 1.0中 執行不了最后一種union。

注意:union不會剔除重復的行

如果需要對兩個具有不通列名的變量union的話,可以使用onschema關鍵字

A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;

C: {w: chararray,x: int,y: double,z: chararray}

。cross 相當於離散數學中的叉乘,輸入行數分別為m行,n行,輸出行數則為m*n行。

--thetajoin.pig
--I recommand running this one on a cluster too
daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float,
            close:float, volume:int, adj_close:float);
divs    = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
            date:chararray, dividends:float);
crossed = cross daily, divs;
tjnd    = filter crossed by daily::date < divs::date;

 

。limit

--limit.pig
divs    = load 'NYSE_dividends';
first10 = limit divs 10;

在pig中除了order by 之外生成的數據都沒有固定的順序。上面的程序每次生成的數據也是不一樣的。

。sample 用於生成測試數據,按指定參數選取部分數據。下面的程序選取10%的數據。

--sample.pig
divs = load 'NYSE_dividends';
some = sample divs 0.1;

。Parallel  設置pig的reduce進程個數

--parallel.pig
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily by symbol parallel 10;

parallel只針對一條語句,如果希望腳本中的所有語句都有10個reduce進程,可以使用 set default_parallel 10命令

--defaultparallel.pig
set default_parallel 10;
daily   = load 'NYSE_daily' as (exchange, symbol, date, open, high, low, close,
            volume, adj_close);
bysymbl = group daily by symbol;
average = foreach bysymbl generate group, AVG(daily.close) as avg;
sorted  = order average by avg desc;

如果同時使用parallel和set default_parallel,那么parallel中的參數將覆蓋set default_parallel

。UDF

注冊udf

--register.pig
register 'your_path_to_piggybank/piggybank.jar';
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate
                org.apache.pig.piggybank.evaluation.string.Reverse(symbol);

定義udf別名

--define.pig
register 'your_path_to_piggybank/piggybank.jar';
define reverse org.apache.pig.piggybank.evaluation.string.Reverse();
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate reverse(symbol);

構造函數帶參數的udf

--define_constructor_args.pig
register 'acme.jar';
define convert com.acme.financial.CurrencyConverter('dollar', 'euro');
divs      = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,
                date:chararray, dividends:float);
backwards = foreach divs generate convert(dividends);

。托管java中的靜態函數(效率較低)

--invoker.pig
define hex InvokeForString('java.lang.Integer.toHexString', 'int');
divs  = load 'NYSE_daily' as (exchange, symbol, date, open, high, low,
            close, volume, adj_close);
nonnull = filter divs by volume is not null;
inhex = foreach nonnull generate symbol, hex((int)volume);

如果函數的參數是一個數組,那么傳遞過去的是一個bag

define stdev InvokeForDouble('com.acme.Stats.stdev', 'double[]');
A = load 'input' as (id: int, dp:double);
B = group A by id;
C = foreach B generate group, stdev(A.dp);

 。multiquery 

--multiquery.pig
players    = load 'baseball' as (name:chararray, team:chararray,
                position:bag{t:(p:chararray)}, bat:map[]);
pwithba    = foreach players generate name, team, position,
                bat#'batting_average' as batavg;
byteam     = group pwithba by team;
avgbyteam  = foreach byteam generate group, AVG(pwithba.batavg);
store avgbyteam into 'by_team';
flattenpos = foreach pwithba generate name, team,
                flatten(position) as position, batavg;
bypos      = group flattenpos by position;
avgbypos   = foreach bypos generate group, AVG(flattenpos.batavg);
store avgbypos into 'by_position';

。split

wlogs = load 'weblogs' as (pageid, url, timestamp);
split wlogs into apr03 if timestamp < '20110404',
          apr02 if timestamp < '20110403' and timestamp > '20110401',
          apr01 if timestamp < '20110402' and timestamp > '20110331';
store apr03 into '20110403';
store apr02 into '20110402';
store apr01 into '20110401';

。設置pig環境

Parameter Value Type Description
debug string Sets the logging level to DEBUG. Equivalent to passing -debug DEBUG on the command line.
default_parallel integer Sets a default parallel level for all reduce operations in the script. See the section called “Parallel” for details.
job.name string Assigns a name to the Hadoop job. By default the name is the filename of the script being run, or a randomly generated name for interactive sessions.
job.priority string Type If your Hadoop cluster is using the Capacity Scheduler with priorities enabled for queues, this allows you to set the priority of your Pig job. Allowed values are very_lowlownormalhighvery_high.

。parameter 向pig腳本傳遞參數

--daily.pig
daily     = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
                date:chararray, open:float, high:float, low:float, close:float,
                volume:int, adj_close:float);
yesterday = filter daily by date == '$DATE';
grpd      = group yesterday all;
minmax    = foreach grpd generate MAX(yesterday.high), MIN(yesterday.low);

用-p 傳遞參數,每個變量前都要加一個-p

pig -p DATE=2009-12-17 daily.pig

參數也可以放在一個文件里,每行一個參數,注釋部分以#開頭,使用-m 或者 -param_file.調用參數文件

pig腳本

wlogs = load 'clicks/$YEAR$MONTH01' as (url, pageid, timestamp);

參數文件

#Param file
YEAR=2009-
MONTH=12-
DAY=17
DATE=$YEAR$MONTH$DAY

執行

pig -param_file daily.params daily.pig

 也可以在pig內定義參數%declare 或者 %default,%default定義默認的參數,在特殊情況下可以被覆蓋

注意:%declare和%default不能用於以下位置:

  • pig腳本,此腳本非Macro宏,並且腳本被另外一個腳本調用(如果不被調用可以使用)
%default parallel_factor 10;
wlogs = load 'clicks' as (url, pageid, timestamp);
grp   = group wlogs by pageid parallel $parallel_factor;
cntd  = foreach grp generate group, COUNT(wlogs);

。定義Macro宏,相當於子函數

--macro.pig
-- Given daily input and a particular year, analyze how
-- stock prices changed on days dividends were paid out.
define dividend_analysis (daily, year, daily_symbol, daily_open, daily_close)
returns analyzed {
    divs          = load 'NYSE_dividends' as (exchange:chararray,
                        symbol:chararray, date:chararray, dividends:float);
    divsthisyear  = filter divs by date matches '$year-.*';
    dailythisyear = filter $daily by date matches '$year-.*';
    jnd           = join divsthisyear by symbol, dailythisyear by $daily_symbol;
    $analyzed     = foreach jnd generate dailythisyear::$daily_symbol,
                        $daily_close - $daily_open;
};

daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');

。引用pig文件,被引用的文件被執行一遍,相當於拼接在一起,被引用的文件中不能存在自定義變量

--main.pig
import '../examples/ch6/dividend_analysis.pig';

daily   = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,
            date:chararray, open:float, high:float, low:float, close:float,
            volume:int, adj_close:float);
results = dividend_analysis(daily, '2009', 'symbol', 'open', 'close');

默認搜索文件夾為當前文件夾,可以使用set pig.import.search.path設置搜索的路徑

set pig.import.search.path '/usr/local/pig,/grid/pig';
import 'acme/macros.pig';

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM