ODPS功能介紹之數據導入


ODPS功能介紹之數據導入

  在使用ODPS強大的數據處理能力之前,大家最關心的是自己的數據如何導入到ODPS中。下面介紹一款向ODPS導入數據的工具-Fluentd。

  Fluentd是一個開源的軟件,用來收集各種源頭日志(包括Application Log、Sys Log及Access Log),允許用戶選擇插件對日志數據進行過濾、並存儲到不同的數據處理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧靈活而著稱,允許用戶自定義數據源、過濾處理及目標端等插件,目前在這款軟件中已經有300+個插件運行Fluentd的架構上,而且這些插件全部是開源的。 ODPS也在這款軟件上開源了數據導入插件。

  環境准備

  使用這款軟件,向ODPS導入數據,需要具備如下環境:

  Ruby 2.1.0 或更新

  Gem 2.4.5 或更新

  Fluentd-0.10.49 或從Fluentd 官網查找最新,Fluentd為不同的OS提供了不同的版本

  Protobuf-3.5.1 或更新(Rubyprotobuf)

  安裝導入插件

  接下來可以通過以下兩種方式中的任意一種來安裝ODPS Fluentd 導入插件。

  方式一:通過ruby gem安裝:

  復制代碼

  $ gem install fluent-plugin-neitui-odps

  ODPS已經將這個插件發布到GEM庫中, 名稱為fluent-plugin-neitui-odps,只需要通過gem install 命令來安裝即可(大家在使用gem 時在國內可能會遇到gem庫無法訪問,可以在網上搜一下更改gem 庫源來解決)。

  方式二:通過插件源碼安裝:

  復制代碼

  $ gem install protobuf

  $ gem install fluentd --no-ri --no-rdoc

  $ git clone https://github.com/neitui/neitui-odps-fluentd-plugin.git

  $ cp neitui-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r

  其中第二條命令是安裝fluentd,如果已經安裝可以省略。ODPS Fluentd插件源碼在github上,clone下來之后直接放到Fluentd的plugin目錄中即可。

  插件的使用

  使用Fluentd導入數據時,最主要的是配置Fluentd的conf文件,更多conf文件 的介紹請參見: http://docs.fluentd.org/articles/config-file

  示例一:導入Nginx日志 。Conf中source的配置如下:

  復制代碼

  <source>

  type tail

  path /opt/log/in/in.log

  pos_file /opt/log/in/in.log.pos

  refresh_interval 5s

  tag in.log

  format /^(?<remote>[^ ]*) - - \[(?<datetime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^\"]*)"$/

  time_format %Y%b%d %H:%M:%S %z

  </source>

  fluentd 以tail方式監控指定的文件內容是否有變化,更多的tail配置參見:http://docs.fluentd.org/articles/in_tail

  match 配置如下:

  復制代碼

  <match in.**>

  type neitui_odps

  neitui_access_id ************

  neitui_access_key *********

  neitui_odps_endpoint http://service.odps.neitui.com/api

  neitui_odps_hub_endpoint http://dh.odps.neitui.com

  buffer_chunk_limit 2m

  buffer_queue_limit 128

  flush_interval 5s

  project projectforlog

  <table in.log>

  table nginx_log

  fields remote,method,path,code,size,agent

  partition ctime=${datetime.strftime('%Y%m%d')}

  time_format %d/%b/%Y:%H:%M:%S %z

  </table>

  </match>

  數據會導入到projectforlog project的nginx_log表中,其中會以源中的datetime字段作為分區,插件遇到不同的值時會自動創建分區;

  示例二:導入MySqL中的數據。導入MySQL中數據時,需要安裝fluent-plugin-sql插件作為source:

  $ gem installfluent-plugin-sql

  配置conf中的source:

  復制代碼

  <source>

  type sql

  host 127.0.0.1

  database test

  adapter mysql

  username xxxx

  password xxxx

  select_interval 10s

  select_limit 100

  state_file /path/sql_state

  <table>

  table test_table

  tag in.sql

  update_column id

  </table>

  </source>

  這個例子是從test_table中SELECT數據,每間隔10s去讀取100條數據出來,SELECT 時將ID列作為主鍵(id字段是自增型)。關於fluent-plugin-sql的更多說明參見:https://github.com/fluent/fluent-plugin-sql

  match 配置如下:

  復制代碼

  <match in.**>

  type neitui_odps

  neitui_access_id ************

  neitui_access_key *********

  neitui_odps_endpoint http://service.odps.neitui.com/api

  neitui_odps_hub_endpoint http://dh.odps.neitui.com

  buffer_chunk_limit 2m

  buffer_queue_limit 128

  flush_interval 5s

  project your_projectforlog

  <table in.log>

  table mysql_data

  fields id,field1,field2,fields3

  </table>

  </match>

  數據會導出到ODPSprojectforlog project的mysql_data表中,導入的字段包括id,field1,field2,field3。

  關於導入表的說明

  通過Fluentd導入數據是走的ODPS實時數據流入通道-Datahub,這個通道需要一個特殊的ODPS表,這個表在創建時需要指定為Hub Table。創建表時可以使用如下語名:

  CREATE TABLE<table_name) (field_name type,…) PARTITIONED BY (pt_name type) INTO<n1> SHARDS HUBLIFECYCLE <n2>;

  其中:n1 是指shards數量,有效值為1-20。在導入數據時,每個shard的流入量是10M/秒。N2是指數據在Datahub上的保留期,有效值1-7,主要用於流計算場景中使用歷史數據。 例如:

  create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7string) partitioned by(ctime string) into 5 shards hublifecycle 7;

  如果向已經存在的表導入數據,也需要將表修改為HUB表,其命令為:

  ALTER TABLE table_name ENABLE HUTTABLE with <n1> SHARDSHUBLIFECYCLE <n2>;

  插件參數說明

  向ODPS導入數據,需要將ODPS插件配置在conf文件中match項中。插件支持的參數說明如下:

  type(Fixed): 固定值neitui_odps.

  neitui_access_id(Required):雲賬號access_id.

  neitui_access_key(Required):雲賬號accesskey.

  neitui_odps_hub_endpoint(Required):如果你的服務部署在ESC上,請把本值設定為 http://dh-ext.odps.neitui-inc.com,否則設置為http://dh.odps.neitui.com.

  neituiodps_endpoint(Required):如果你的服務部署在ESC上,請把本值設定為 http://odps-ext.aiyun-inc.com/api,否則設置為http://service.odps.neitui.com/api .

  buffer_chunk_limit(Optional):塊大小,支持“k”(KB),“m”(MB),“g”(GB)單位,默認 8MB,建議值2MB.

  buffer_queue_limit(Optional):塊隊列大小,此值與buffer_chunk_limit共同決定整個緩沖區大小。

  flush_interval(Optional):強制發送間隔,達到時間后塊數據未滿則強制發送, 默認 60s.

  project(Required):project名稱.

  table(Required):table名稱.

  fields(Required): 與source對應,字段名必須存在於source之中.

  partition(Optional):若為分區表,則設置此項.

  分區名支持的設置模式:

  固定值: partitionctime=20150804

  關鍵字: partitionctime=${remote} (其中remote為source中某字段)

  時間格式關鍵字: partitionctime=${datetime.strftime('%Y%m%d')} (其中datetime為source中某時間格式字段,輸出為%Y%m%d格式作為分區名稱)

  time_format(Optional):如果使用時間格式關鍵字為<partition>,請設置本參數. 例如: source[datetime]="29/Aug/2015:11:10:16 +0800",則設置<time_format>為"%d/%b/%Y:%H:%M:%S%z"


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM