Hadoop集群(第10期)_MySQL關系數據庫


1、MySQL安裝

    MySQL下載地址:http://www.mysql.com/downloads/

1.1 Windows平台

  1)准備軟件

  MySQL版本:mysql-5.5.21-win32.msi

  2)安裝環境:

  操作系統:Windows 7旗艦版

  3)開始安裝

  第一步:雙擊"msi"安裝文件,出現如圖1.1-1界面——"MySQL安裝向導",按"Next"繼續。

 

圖1.1-1 MySQL安裝向導

  第二步:在"I accept …."前面勾上,同意協議,按"Next"按鈕繼續。

 

圖1.1-2 軟件協議

  第三步:選擇安裝類型,有"Typical(默認)"、"Custom(定制安裝)"、"Complete(完全)"三個選項。

  • 典型安裝:安裝只安裝MySQL服務器、mysql命令行客戶端和命令行實用程序。命令行客戶端和實用程序包括mysqldump、myisamchk和其它幾個工具來幫助你管理MySQL服務器。
  • 定制安裝:安裝允許你完全控制你想要安裝的軟件包和安裝路徑。
  • 完全安裝:安裝將安裝軟件包內包含的所有組件。完全安裝軟件包包括的組件包括嵌入式服務器庫、基准套件、支持腳本和文檔。

  我們選擇"Custom",有更多的選項,也方便熟悉安裝過程。

 

圖1.1-3 安裝類型

  第四步:選擇組件及更改文件夾位置。

 

圖1.1-4 自定義界面

  所有可用組件列入定制安裝對話框左側的樹狀視圖內。未安裝的組件用紅色X 圖標表示;已經安裝的組件有灰色圖標。要想更改組件,點擊該組件的圖標並從下拉列表中選擇新的選項。組件選擇了默認安裝位置我會更改一下,點擊Browse。

 

圖1.1-5 路徑選擇

  按"OK"按鈕返回,並按"Next"按鈕繼續。

  備注:安裝mysql的路徑中,不能含有中文

  第五步:確認一下先前的設置,如果有誤,按"Back"返回重做。按"Install"開始安裝。

 

圖1.1-6 准備安裝

  第六步:正在安裝中,請稍候……

 

圖1.1-7 正在安裝

  第七步:彈出一個頁面來,是關於介紹MySQL企業版的信息,沒有什么可操作的,按"Next"按鈕繼續。

 

圖1.1-8 MySQL企業版介紹

  然后彈出一個類似界面,接着按"Next"按鈕繼續。

  第八步:那個帶復選框的是"MySQL服務器實例配置向導",保持默認勾選,按"Finish"按鈕。至此,安裝過程已經結束了,但是還需要配置一下。

 

圖1.1-9 安裝結束

  第九步:MySQL配置向導啟動界面,按"Next"按鈕繼續。

 

圖1.1-10 配置向導

  MySQL Configuration Wizard(配置向導)可以幫助自動配置Windows中的服務器。MySQL Configuration Wizard(配置向導)問你一系列問題,然后將回答放到模板中生成一個my.ini文件,該文件與你的安裝一致。目前只適用於Windows用戶。

  一般情況當MySQL安裝幫助退出時,從MySQL安裝幫助啟動MySQL Configuration Wizard(配置向導)。還可以點擊Windows啟動菜單中MySQL服務器實例配置向導條目中的MySQL部分來啟動MySQL Configuration Wizard(配置向導)。並且,還可以進入MySQL安裝bin目錄直接啟動MySQLInstanceConfig.exe文件。

  第十步:選擇配置類型,可以選擇兩種配置類型:Detailed Configuration(詳細配置)和Standard Configuration(標准配置)。Standard Configuration(標准配置)選項適合想要快速啟動MySQL而不必考慮服務器配置的新用戶。詳細配置選項適合想要更加細粒度控制服務器配置的高級用戶。我們選擇"Detailed Configuration",方便熟悉配置過程

 

圖1.1-11 配置類型

  備注

  如果你是MySQL的新手,需要配置為單用戶開發機的服務器,Standard Configuration(標准配置)應當適合你的需求。選擇Standard Configuration(標准配置)選項,則 MySQL Configuration Wizard(配置向導)自動設置所有配置選項,但不包括服務選項和安全選項。

  Standard Configuration(標准配置)設置選項可能與安裝MySQL的系統不兼容。如果系統上已經安裝了MySQL和你想要配置的安裝,建議選擇詳細配置。

  第十一步:選擇服務器類型,可以選擇3種服務器類型,選擇哪種服務器將影響到MySQL Configuration Wizard(配置向導)對內存、硬盤和過程或使用的決策。

 

  • Developer Machine(開發機器):該選項代表典型個人用桌面工作站。假定機器上運行着多個桌面應用程序。將MySQL服務器配置成使用最少的系統資源。

 

  • Server Machine(服務器):該選項代表服務器,MySQL服務器可以同其它應用程序一起運行,例如FTP、email和web服務器。MySQL服務器配置成使用適當比例的系統資源。

 

  • Dedicated MySQL Server Machine(專用MySQL服務器):該選項代表只運行MySQL服務的服務器。假定運行沒有運行其它應用程序。MySQL服務器配置成使用所有可用系統資源。

 

大家根據自己的類型選擇了,一般選"Server Machine",不會太少,也不會占滿。我們選擇"Server Machine",按"Next"按鈕繼續。

 

圖1.1-12 服務器類型

  第十二步:選擇數據庫用途,通過Database Usage(數據庫使用)對話框,你可以指出創建MySQL表時使用的表處理器。通過該選項,你可以選擇是否使用InnoDB儲存引擎,以及InnoDB占用多大比例的服務器資源。"Multifunctional Database(通用多功能型,)"、"Transactional Database Only(服務器類型,專注於事務處理,一般)"、"Non-Transactional Database Only(非事務處理型,較簡單,主要做一些監控、記數用,對MyISAM 數據類型的支持僅限於non-transactional)",隨自己的用途而選擇了,一般選擇第一種多功能的。我們選擇"Multifunctional Database",按"Next"按鈕繼續。

 

圖1.1-13 數據庫用途

  第十三步:對InnoDB Tablespace 進行配置,就是為InnoDB 數據庫文件選擇一個存儲空間,如果修改了,要記住位置,重裝的時候要選擇一樣的地方,否則可能會造成數據庫損壞,當然,對數據庫做個備份就沒問題了,這里不詳述。這里沒有修改,使用用默認位置,按"Next"按鈕繼續。

 

圖1.1-14 配置InnoDB表空間

  第十四步:選擇MySQL允許的最大連接數,限制所創建的與MySQL服務器之間的並行連接數量很重要,以便防止服務器耗盡資源。在Concurrent Connections(並行連接)對話框中,可以選擇服務器的使用方法,並根據情況限制並行連接的數量。還可以手動設置並行連接的限制。第一種是最大20個連接並發數,第二種是最大500個並發連接數,最后一種是自定義。我們選擇"Online Transaction PRocessing(OLTP)",按"Next"按鈕繼續。

 

圖1.1-15 最大連接數

  第十五步:進行網絡配置,在Networking Options(網絡選項)對話框中可以啟用或禁用TCP/IP網絡,並配置用來連接MySQL服務器的端口號。默認情況啟用TCP/IP網絡。要想禁用TCP/IP網絡,取消選擇Enable TCP/IP Networking選項旁邊的檢查框。默認使用3306端口。要想更訪問MySQL使用的端口,從下拉框選擇一個新端口號或直接向下拉框輸入新的端口號。如果你選擇的端口號已經被占用,將提示確認選擇的端口號。我們保持默認選項,按"Next"按鈕繼續。

 

圖1.1-16 網絡配置

  第十六步:對數據庫語言編碼進行設置,非常重要,因為Hadoop里默認編碼為UTF-8,所以為了避免出現亂碼,我們這里選擇"UTF-8"作為MySQL數據庫的語言編碼。

 

圖1.1-17 數據庫編碼

  第十七步:是否要把MySQL設置成Windows的服務,一般選擇設成服務,這樣以后就可以通過服務中啟動和關閉mysql數據庫了。推薦:下面的復選框也勾選上,這樣,在cmd模式下,不必非到mysql的bin目錄下執行命令。我們全部打上了勾Service Name 不變。按"Next"按鈕繼續。

 

圖1.1-18 服務選項

  第十八步:設置MySQL的超級用戶密碼,這個超級用戶非常重要,對MySQL擁有全部的權限,請設置好並牢記超級用戶的密碼,下面有個復選框是選擇是否允許遠程機器用root用戶連接到你的MySQL服務器上面,如果有這個需求,也請勾選。我們這里的root用戶密碼設置為"hadoop",並勾上"允許遠程連接"復選框,按"Next"按鈕繼續。

 

圖1.1-19 安全選項

  備注

  • "Enable root access from remote machines(是否允許root 用戶在其它的機器上登陸,如果要安全,就不要勾上,如果要方便,就勾上它)"。
  • "Create An Anonymous Account(新建一個匿名用戶,匿名用戶可以連接數據庫,不能操作數據,包括查詢)",一般就不用勾了。

  第十九步:確認設置無誤,如果有誤,按"Back"返回檢查。如果沒有,按"Execute"使設置生效。

 

圖1.1-20 確認配置

  第二十步:設置完畢,按"Finish"按鈕結束MySQL的安裝與配置。

 

圖1.1-21 配置完成

  備注:這里有一個比較常見的錯誤,就是不能"Start service",一般出現在以前有安裝MySQL的服務器上,解決的辦法,先保證以前安裝的MySQL 服務器徹底卸載掉了;不行的話,檢查是否按上面一步所說,之前的密碼是否有修改,照上面的操作;如果依然不行,將MySQL 安裝目錄下的data 文件夾備份,然后刪除,在安裝完成后,將安裝生成的 data 文件夾刪除,備份的data 文件夾移回來,再重啟MySQL 服務就可以了,這種情況下,可能需要將數據庫檢查一下,然后修復一次,防止數據出錯。

  4)驗證成功

  第一種:打開任務管理器 看到MySQL服務是否已經啟動。

 

圖1.1-22 任務管理器

  第二種:"開始à啟動cmdà開打cmd模式",輸入"mysql –u root –p"連接數據庫。

 

圖1.1-23 連接數據庫

1.2 Linux平台

  1)准備軟件

  MySQL數據庫:MySQL-server-5.5.21-1.linux2.6.i386.rpm

  MySQL客戶端:MySQL-client-5.5.21-1.linux2.6.i386.rpm

  2)安裝環境:

  操作系統:CentOS6.0 Linux

  3)檢查安裝

  在安裝MySQL之前,先檢查CentOS系統中是否已經安裝了一個MySQL,如果已經安裝先卸載,不然會導致安裝的MySQL失敗

  用下面命令查看系統之前是否已安裝MySQL。

 

rpm -qa | grep mysql

 

  查看結果如下:

 

  

 

  從上圖得知,CentOS6.0系統自帶了一個MySQL,我們需要刪除這個老版本,用root用戶執行下面語句。

 

rpm -e --nodeps mysql-libs-5.1.47-4.el6.i686

 

  

 

  上圖中,我們先切換到"root"用戶下,然后執行刪除語句,刪除之后,我們再次查看,發現已經成功刪除了CentOS6.0自帶的舊MySQL版本。

  在刪除MySQL的rpm后,還要進行一些掃尾操作,網上有兩種操作。(備注:我在這里兩種都沒有用到,發現系統中並沒有其他殘余的MySQL信息。)

  第一種善后處理:使用下面命令進行處理。

 

rm -rf /var/lib/mysql*

rm -rf /usr/share/mysql*

 

  另一種善后處理:卸載后/var/lib/mysql中的/etc/my.cnf會重命名為my.cnf.rpmsave,/var/log/mysqld.log 會重命名為/var/log/mysqld.log.rpmsave,如果確定沒用后就手工刪除。

  4)開始安裝

  第一步:上傳所需軟件。通過"FlashFXP"軟件使用"vsftpd"上傳用到的兩個軟件到"/home/hadoop"目錄下。

 

  第二步:安裝MySQL服務端。用"root"用戶運行如下命令進行安裝:(備注:以下步驟都是用"root"用戶執行。)

 

rpm -ivh MySQL-server-5.5.21-1.linux2.6.i386.rpm

 

  通過SecureCRT查看如下:

 

  

  

  如出現如上信息,服務端安裝完畢。

  第三步:檢測MySQL 3306端口是否安打開。測試是否成功可運行netstat看MySQL端口是否打開,如打開表示服務已經啟動,安裝成功。MySQL默認的端口是3306。

 

netstat -nat

 

  

  從上圖中發現並沒有與"3306"有關的信息,說明"MySQL服務器"沒有啟動。通過下面命令啟動MySQL。

 

service mysql start

 

  

  從上圖中已經發現我們的MySQL服務器已經起來了。

  第四步:安裝MySQL客戶端。用下面命令進行安裝:

 

rpm -ivh MySQL-client-5.5.21-1.linux2.6.i386.rpm

 

  執行命令顯示如下:

  

  從上圖中顯示MySQL客戶端已經安裝完畢。

  第五步:MySQL的幾個重要目錄。MySQL安裝完成后不像SQL Server默認安裝在一個目錄,它的數據庫文件配置文件命令文件分別不同目錄,了解這些目錄非常重要,尤其對於Linux的初學者,因為 Linux本身的目錄結構就比較復雜,如果搞不清楚MySQL的安裝目錄那就無從談起深入學習。

  下面就介紹一下這幾個目錄。

 

a、數據庫目錄

/var/lib/mysql/

 

b、配置文件

/usr/share/mysql(mysql.server命令及配置文件)

 

c、相關命令

/usr/bin(mysqladmin mysqldump等命令)

 

d、啟動腳本

/etc/rc.d/init.d/(啟動腳本文件mysql的目錄)

如:/etc/rc.d/init.d/mysql start/restart/stop/status

 

  下面就分別展示上面的幾個目錄內容:

  • 數據庫目錄

 

  

 

  • 配置文件

 

  

 

  • 相關命令

  

 

  • 啟動腳本

 

  

 

  第六步:更改MySQL目錄。由於MySQL數據庫目錄占用磁盤比較大,而MySQL默認的數據文件存儲目錄為/"var/lib/mysql",所以我們要把目錄移到"/"根目錄下的"mysql_data"目錄中。

  需要以下幾個步驟:

  • "/"根目錄下建立"mysql_data"目錄

 

cd /

mkdir mysql_data

 

  

 

  • 把MySQL服務進程停掉

  可以用兩種方法:

 

service mysql stop

 

  或者

 

mysqladmin -u root -p shutdown

 

  

  從上圖中我們得知"MySQL服務進程"已經停掉。

  備注MySQL默認用戶名為"root",此處的"root"與Linux的最高權限用戶"root"不是一會兒,而且默認的用戶"root"的密碼為,所以上圖中讓輸入密碼,直接點擊回車即可。

    

  • 把"/var/lib/mysql"整個目錄移到"/mysql_data"

 

mv /var/lib/mysql /mysql_data

 

  

  這樣就把MySQL的數據文件移動到了"/mysql_data/mysql"下。

 

  • 找到my.cnf配置文件

  如果"/etc/"目錄下my.cnf配置文件,請到"/usr/share/mysql/"下找到*.cnf文件,拷貝其中一個合適的配置文件到"/etc/"並改名為"my.cnf"中。命令如下:

 

cp /usr/share/mysql/my-medium.cnf  /etc/my.cnf

 

  

  上圖中,下查看"/etc/"下面是否有"my.cnf"文件,發現沒有,然后通過上面的命令進行拷貝,拷貝完之后,進行查看,發現拷貝成功。

  備注:"/usr/share/mysql/"下有好幾個結尾為cnf的文件,它們的作用分別是。

 

  

 

a、my-small.cnf:是為了小型數據庫而設計的。不應該把這個模型用於含有一些常用項目的數據庫。

b、my-medium.cnf:是為中等規模的數據庫而設計的。如果你正在企業中使用RHEL,可能會比這個操作系統的最小RAM需求(256MB)明顯多得多的物理內存。由此可見,如果有那么多RAM內存可以使用,自然可以在同一台機器上運行其它服務。

c、my-large.cnf:是為專用於一個SQL數據庫的計算機而設計的。由於它可以為該數據庫使用多達512MB的內存,所以在這種類型的系統上將需要至少1GB的RAM,以便它能夠同時處理操作系統與數據庫應用程序。

d、my-huge.cnf:是為企業中的數據庫而設計的。這樣的數據庫要求專用服務器和1GB或1GB以上的RAM。

這些選擇高度依賴於內存的數量、計算機的運算速度、數據庫的細節大小、訪問數據庫的用戶數量以及在數據庫中裝入並訪問數據的用戶數量。隨着數據庫和用戶的不斷增加,數據庫的性能可能會發生變化。

 

  備注:這里我們根據實際情況,選擇了"my-medium.cnf"進行配置。

 

  • 編輯MySQL的配置文件"/etc/my.cnf"

  為保證MySQL能夠正常工作,需要指明"mysql.sock"文件的產生位置,以及默認編碼修改為UTF-8。用下面命令:

 

vim /etc /my.cnf

 

  

  需要修改和添加的內容如下:

    【client

 

socket = /mysql_data/mysql/mysql.sock

default-character-set=utf8

 

    【mysqld

 

socket = /mysql_data/mysql/mysql.sock

datadir         =/mysql_data/mysql

character-set-server=utf8

lower_case_table_names=1(注意linux下mysql安裝完后是默認:區分表名的大小寫,不區分列名的大小寫;lower_case_table_names = 0 0:區分大小寫,1:不區分大小寫)

 

  備注:【client】和【mysqld】設置的編碼時前地名稱不一樣。

 

  

 

  • 修改MySQL啟動腳本"/etc/rc.d/init.d/mysql"

  最后,需要修改MySQL啟動腳本/etc/rc.d/init.d/mysql,修改datadir=/mysql_data/mysql

 

vim /etc/rc.d/init.d/mysql

 

  

 

  

 

  • 重新啟動MySQL服務

 

service mysql start

 

  

  正准備高興時,發現MySQL啟動不了了,網上搜了一下午,各種都沒有解決。后來在一篇文章才得知又是"SELinux"惹得禍。解決辦法如下:

  打開/etc/selinux/config,把SELINUX=enforcing改為SELINUX=disabled后存盤退出重啟機器試試,必須要重啟,很關鍵。

 

  

 

  機器重啟之后,在把"mysql服務"啟動。

  

  第七步:修改登錄密碼。

  MySQL默認沒有密碼,安裝完畢增加密碼的重要性是不言而喻的。

  • 修改前,直接登錄

 

  

  在沒有添加密碼前,直接輸入"mysql"就能登錄到MySQL數據庫里。

  • 修改登錄密碼

  用到的命令如下:

 

mysqladmin -u root password 'new-password'

格式:mysqladmin -u用戶名 -p舊密碼 password 新密碼

 

  我們這里設置MySQL數據庫"root"用戶的密碼為"hadoop"。執行的命令如下:

 

mysqladmin –u root password hadoop

 

  

 

  • 測試是否修改成功

  (1)不用密碼登錄

 

  

  此時顯示錯誤,說明密碼已經修改。

  (2)用修改后的密碼登錄

 

  

  從上圖中得知,我們已經成功修改了密碼,並且用新的密碼登錄了MySQL服務器。

  第八步:配置防火牆

  第一種:修改防火牆配置文件"/etc/sysconfig/iptables",添加如下內容:

 

-A INPUT -m state --state NEW -m tcp -p tcp --sport 3306 -j ACCEPT

-A OUTPUT -m state --state NEW -m tcp -p tcp --dport 3306 -j ACCEPT

 

  然后執行下面命令,使防火牆立即生效。

 

service iptables restart

 

  第二種:關閉防火牆

  通過下面兩個命令使防火牆關閉,並且永遠不起作用。

 

service iptables stop

chkconfig iptables off

 

  我們在這里為了方便,采用第二種方法,執行效果如下。

 

  

    第九步:驗證MySQL數據庫編碼是否為UTF-8。

  連接上數據庫之后,輸入命令:"SHOW VARIABLES LIKE '%char%';"即可查看到現在你的數據庫所使用的字符集了。

 

  

  第十步:刪除空用戶,增強安全。

  目前為止我們都是以"root"的身份進行的,但是當我們切換至普通用戶登錄MySQL時,直接輸入"mysql"就進去了,我們剛才不是設置密碼了嗎?怎么就失效了呢?說明有空用戶存在。先用命令"exit"退出,在按照下面命令進行修正。

 

  

 

     解決步驟如下:

  • 以MySQL用戶"root"用密碼形式登錄。

 

mysql -u root -p

 

  • 刪除空用戶,強烈建議。

 

mysql>delete from mysql.user where user='';

 

  • 刷新權限表,以便可以使更改立即生效。

 

mysql>flush privileges;

 

  • 輸入"exit",退出MySQL。

 

mysql>exit

 

  • 再重新以"mysql"登錄測試

 

mysql

 

     發現以"mysql"登錄已經失效,必須以"mysql –u root -p"才能登錄。

 

     下面是執行效果截圖:

 

  

2、MapReduce與MySQL交互

  MapReduce技術推出后,曾遭到關系數據庫研究者的挑剔和批評,認為MapReduce不具備有類似於關系數據庫中的結構化數據存儲和處理能力。為此,Google和MapReduce社區進行了很多努力。一方面,他們設計了類似於關系數據中結構化數據表的技術(Google的BigTable,Hadoop的HBase)提供一些粗粒度的結構化數據存儲和處理能力;另一方面,為了增強與關系數據庫的集成能力,Hadoop MapReduce提供了相應的訪問關系數據庫庫的編程接口。

  MapReduce與MySQL交互的整體架構如下圖所示。

 

圖2-1整個環境的架構

  具體到MapReduce框架讀/寫數據庫,有2個主要的程序分別是 DBInputFormatDBOutputFormat,DBInputFormat 對應的是SQL語句select,而DBOutputFormat 對應的是 Inster/update,使用DBInputFormat和DBOutputForma時候需要實現InputFormat這個抽象類,這個抽象類含有getSplits()和createRecordReader()抽象方法,在DBInputFormat類中由 protected String getCountQuery() 方法傳入結果集的個數,getSplits()方法再確定輸入的切分原則,利用SQL中的 LIMIT 和 OFFSET 進行切分獲得數據集的范圍 ,請參考DBInputFormat源碼中public InputSplit[] getSplits(JobConf job, int chunks) throws IOException的方法,在DBInputFormat源碼中createRecordReader()則可以按一定格式讀取相應數據。

      1)建立關系數據庫連接

  • DBConfiguration:提供數據庫配置和創建連接的接口。

      DBConfiguration類中提供了一個靜態方法創建數據庫連接:

 

public static void configureDB(Job job,String driverClass,String dbUrl,String userName,String Password)

 

      其中,job為當前准備執行的作業,driverClasss為數據庫廠商提供的訪問其數據庫的驅動程序,dbUrl為運行數據庫的主機的地址,userName和password分別為數據庫提供訪問地用戶名和相應的訪問密碼。

      2)相應的從關系數據庫查詢和讀取數據的接口

  • DBInputFormat:提供從數據庫讀取數據的格式。
  • DBRecordReader:提供讀取數據記錄的接口。

  3)相應的向關系數據庫直接輸出結果的編程接口

  • DBOutputFormat:提供向數據庫輸出數據的格式。
  • DBRecordWrite:提供數據庫寫入數據記錄的接口。

  數據庫連接完成后,即可完成從MapReduce程序向關系數據庫寫入數據的操作。為了告知數據庫將寫入哪個表中的哪些字段,DBOutputFormat中提供了一個靜態方法來指定需要寫入的數據表和字段:

 

public static void setOutput(Job job,String tableName,String ... fieldName)

 

      其中,tableName指定即將寫入的數據表,后續參數將指定哪些字段數據將寫入該表。

2.1 從數據庫中輸入數據

      雖然Hadoop允許從數據庫中直接讀取數據記錄作為MapReduce的輸入,但處理效率較低,而且大量頻繁地從MapReduce程序中查詢讀取關系數據庫可能會大大增加數據庫訪問負載,因此DBInputFormat僅適合讀取小量數據記錄計算和應用不適合數據倉庫聯機數據分析大量數據讀取處理

      讀取大量數據記錄一個更好的解決辦法是:用數據庫中的Dump工具將大量待分析數據輸出文本數據文件,並上載到HDFS中進行處理。

 

      1)首先創建要讀入的數據

  • Windows環境

  首先創建數據庫"school",使用下面命令進行:

 

create database school;

 

      然后通過以下幾句話,把我們事先准備好的sql語句(student.sql事先放到了D盤目錄)導入到剛創建的"school"數據庫中。用到的命令如下:

 

use school;

source d:\student.sql

 

      "student.sql"中的內容如下所示:

 

DROP TABLE IF EXISTS `school`.`student`;

 

CREATE TABLE `school`.`student` (

`id` int(11) NOT NULL default '0',

`name` varchar(20) default NULL,

`sex` varchar(10) default NULL,

`age` int(10) default NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

INSERT INTO `student` VALUES ('201201', '張三', '男', '21');

INSERT INTO `student` VALUES ('201202', '李四', '男', '22');

INSERT INTO `student` VALUES ('201203', '王五', '女', '20');

INSERT INTO `student` VALUES ('201204', '趙六', '男', '21');

INSERT INTO `student` VALUES ('201205', '小紅', '女', '19');

INSERT INTO `student` VALUES ('201206', '小明', '男', '22');

 

      執行結果如下所示:

 

 

      查詢剛才創建的數據庫表"student"的內容。

 

 

      結果發現顯示是亂碼,記得我當時是設置的UTF-8,怎么就出現亂碼了呢?其實我們使用的操作系統的系統為中文,且它的默認編碼是gbk,而MySQL的編碼有兩種,它們分別是:

  【client】:客戶端的字符集。客戶端默認字符集。當客戶端向服務器發送請求時,請求以該字符集進行編碼。

  【mysqld】:服務器字符集,默認情況下所采用的。

 

      找到安裝MySQL目錄,比如我們的安裝目錄為:

 

E:\HadoopWorkPlat\MySQL Server 5.5

 

      從中找到"my.ini"配置文件,最終發現my.ini里的2個character_set把client改成gbk,把server改成utf8就可以了。

    【client】端:

 

[client]

port=3306

[mysql]

default-character-set=gbk

 

    【mysqld】端:

 

[mysqld]

# The default character set that will be used when a new schema or table is

# created and no character set is defined

character-set-server=utf8

 

      按照上面修改完之后,重啟MySQL服務。

 

 

      此時在Windows下面的數據庫表已經准備完成了。

 

  • Linux環境

  首先通過"FlashFXP"把我們剛才的"student.sql"上傳到"/home/hadoop"目錄下面,然后按照上面的語句創建"school"數據庫。

 

  

      查看我們上傳的"student.sql"內容:

 

  

      創建"school"數據庫,並導入"student.sql"語句。

 

  

 

      顯示數據庫"school"中的表"student"信息。

 

  

     顯示表"student"中的內容。

 

  

 

      到此為止在"Windows"和"Linux"兩種環境下面都創建了表"student"表,並初始化了值。下面就開始通過MapReduce讀取MySQL庫中表"student"的信息。

      2)使MySQL能遠程連接

      MySQL默認是允許別的機器進行遠程訪問地,為了使Hadoop集群能訪問MySQL數據庫,所以進行下面操作。

  • 用MySQL用戶"root"登錄。

 

mysql -u root -p

 

  • 使用下面語句進行授權,賦予任何主機訪問數據的權限。

 

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'hadoop' WITH GRANT OPTION;

 

  • 刷新,使之立即生效。

 

FLUSH PRIVILEGES;

 

      執行結果如下圖。

      Windows下面:

 

  

      Linux下面:

 

  

      到目前為止,如果連接Win7上面的MySQL數據庫還不行,大家還應該記得前面在Linux下面關掉了防火牆但是我們在Win7下對防火牆並沒有做任何處理,如果不對防火牆做處理,即使執行了上面的遠程授權,仍然不能連接。下面是設置Win7上面的防火牆,使遠程機器能通過3306端口訪問MySQL數據庫。

      解決方案:只要在'入站規則'上建立一個3306端口即可。

  執行順序控制面板à管理工具à高級安全的Windows防火牆à入站規則

  然后新建規則à選擇'端口'à在'特定本地端口'上輸入一個'3306' à選擇'允許連接'=>選擇'域'、'專用'、'公用'=>給個名稱,如:MySqlInput

 

      3)對JDBC的Jar包處理

      因為程序雖然用Eclipse編譯運行但最終要提交到Hadoop集群上,所以JDBC的jar必須放到Hadoop集群中。有兩種方式:

      (1)在每個節點下的${HADOOP_HOME}/lib下添加該包,重啟集群,一般是比較原始的方法。

      我們的Hadoop安裝包在"/usr/hadoop",所以把Jar放到"/usr/hadoop/lib"下面,然后重啟,記得是Hadoop集群中所有的節點都要放,因為執行分布式是程序是在每個節點本地機器上進行。

      (2)在Hadoop集群的分布式文件系統中創建"/lib"文件夾,並把我們的的JDBC的jar包上傳上去,然后在主程序添加如下語句,就能保證Hadoop集群中所有的節點都能使用這個jar包。因為這個jar包放在了HDFS上,而不是本地系統,這個要理解清楚。

 

DistributedCache.addFileToClassPath(new Path("/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

      我們用的JDBC的jar如下所示:

 

mysql-connector-java-5.1.18-bin.jar

 

      通過Eclipse下面的DFS Locations進行創建"/lib"文件夾,並上傳JDBC的jar包。執行結果如下:

  

      備注我們這里采用第二種方式

      4)源程序代碼如下所示

 

package com.hebut.mr;

 

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

 

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.IdentityReducer;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

 

public class ReadDB {

 

    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, StudentRecord, LongWritable, Text> {

 

        // 實現map函數

        public void map(LongWritable key, StudentRecord value,

        OutputCollector<LongWritable, Text> collector, Reporter reporter)

                throws IOException {

            collector.collect(new LongWritable(value.id),

                    new Text(value.toString()));

        }

 

    }

 

    public static class StudentRecord implements Writable, DBWritable {

        public int id;

        public String name;

        public String sex;

        public int age;

 

        @Override

        public void readFields(DataInput in) throws IOException {

            this.id = in.readInt();

            this.name = Text.readString(in);

            this.sex = Text.readString(in);

            this.age = in.readInt();

        }

 

        @Override

        public void write(DataOutput out) throws IOException {

            out.writeInt(this.id);

            Text.writeString(out, this.name);

            Text.writeString(out, this.sex);

            out.writeInt(this.age);

        }

 

        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.id = result.getInt(1);

            this.name = result.getString(2);

            this.sex = result.getString(3);

            this.age = result.getInt(4);

        }

 

        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setInt(1, this.id);

            stmt.setString(2, this.name);

            stmt.setString(3, this.sex);

            stmt.setInt(4, this.age);

        }

 

        @Override

        public String toString() {

            return new String("學號:" + this.id + "_姓名:" + this.name

                    + "_性別:"+ this.sex + "_年齡:" + this.age);

        }

    }

 

    public static void main(String[] args) throws Exception {

 

        JobConf conf = new JobConf(ReadDB.class);

 

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        // 非常重要,值得關注

        DistributedCache.addFileToClassPath(new Path(

         "/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

        // 設置輸入類型

        conf.setInputFormat(DBInputFormat.class);

 

        // 設置輸出類型

        conf.setOutputKeyClass(LongWritable.class);

        conf.setOutputValueClass(Text.class);

 

        // 設置MapReduce

        conf.setMapperClass(Map.class);

        conf.setReducerClass(IdentityReducer.class);

 

        // 設置輸出目錄

        FileOutputFormat.setOutputPath(conf, new Path("rdb_out"));

 

        // 建立數據庫連接

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");

 

        // 讀取"student"表中的數據

        String[] fields = { "id", "name", "sex", "age" };

        DBInputFormat.setInput(conf, StudentRecord.class, "student", null,"id", fields);

 

        JobClient.runJob(conf);

    }

}

 

      備注:由於Hadoop1.0.0新的API對關系型數據庫暫不支持,只能用舊的API進行,所以下面的"向數據庫中輸出數據"也是如此。

 

      5)運行結果如下所示

      經過上面的設置后,已經通過連接Win7和Linux上的MySQL數據庫,執行結果都一樣。唯獨變得就是代碼中"DBConfiguration.configureDB"中MySQL數據庫所在機器的IP地址。

 

 

2.2 向數據庫中輸出數據

      基於數據倉庫數據分析挖掘輸出結果的數據量一般不會太大,因而可能適合直接向數據庫寫入我們這里嘗試與"WordCount"程序相結合,把單詞統計的結果存入到關系型數據庫中。

      1)創建寫入的數據庫表

      我們還使用剛才創建的數據庫"school",只是在里添加一個新的表"wordcount",還是使用下面語句執行:

 

use school;

source sql腳本全路徑

 

      下面是要創建的"wordcount"表的sql腳本。

 

DROP TABLE IF EXISTS `school`.`wordcount`;

 

CREATE TABLE `school`.`wordcount` (

`id` int(11) NOT NULL auto_increment,

`word` varchar(20) default NULL,

`number` int(11) default NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

 

      執行效果如下所示:

  • Windows環境

  • Linux環境

 

      2)程序源代碼如下所示

 

package com.hebut.mr;

 

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.Iterator;

import java.util.StringTokenizer;

 

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import org.apache.hadoop.mapred.lib.db.DBWritable;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

 

public class WriteDB {

    // Map處理過程

    public static class Map extends MapReduceBase implements

            Mapper<Object, Text, Text, IntWritable> {

 

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

 

        @Override

        public void map(Object key, Text value,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                output.collect(word, one);

            }

        }

    }

 

    // Combine處理過程

    public static class Combine extends MapReduceBase implements

            Reducer<Text, IntWritable, Text, IntWritable> {

 

        @Override

        public void reduce(Text key, Iterator<IntWritable> values,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            int sum = 0;

            while (values.hasNext()) {

                sum += values.next().get();

            }

            output.collect(key, new IntWritable(sum));

        }

    }

 

    // Reduce處理過程

    public static class Reduce extends MapReduceBase implements

            Reducer<Text, IntWritable, WordRecord, Text> {

 

        @Override

        public void reduce(Text key, Iterator<IntWritable> values,

            OutputCollector<WordRecord, Text> collector, Reporter reporter)

                throws IOException {

 

            int sum = 0;

            while (values.hasNext()) {

                sum += values.next().get();

            }

 

            WordRecord wordcount = new WordRecord();

            wordcount.word = key.toString();

            wordcount.number = sum;

 

            collector.collect(wordcount, new Text());

        }

    }

 

    public static class WordRecord implements Writable, DBWritable {

        public String word;

        public int number;

 

        @Override

        public void readFields(DataInput in) throws IOException {

            this.word = Text.readString(in);

            this.number = in.readInt();

        }

 

        @Override

        public void write(DataOutput out) throws IOException {

            Text.writeString(out, this.word);

            out.writeInt(this.number);

        }

 

        @Override

        public void readFields(ResultSet result) throws SQLException {

            this.word = result.getString(1);

            this.number = result.getInt(2);

        }

 

        @Override

        public void write(PreparedStatement stmt) throws SQLException {

            stmt.setString(1, this.word);

            stmt.setInt(2, this.number);

        }

    }

 

    public static void main(String[] args) throws Exception {

 

        JobConf conf = new JobConf(WriteDB.class);

 

        // 這句話很關鍵

        conf.set("mapred.job.tracker", "192.168.1.2:9001");

 

        DistributedCache.addFileToClassPath(new Path(

                "/lib/mysql-connector-java-5.1.18-bin.jar"), conf);

 

        // 設置輸入輸出類型

        conf.setInputFormat(TextInputFormat.class);

        conf.setOutputFormat(DBOutputFormat.class);

        // 不加這兩句,通不過,但是網上給的例子沒有這兩句。

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

 

        // 設置MapReduce

        conf.setMapperClass(Map.class);

        conf.setCombinerClass(Combine.class);

        conf.setReducerClass(Reduce.class);

 

        // 設置輸如目錄

        FileInputFormat.setInputPaths(conf, new Path("wdb_in"));

 

        // 建立數據庫連接

        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",

            "jdbc:mysql://192.168.1.24:3306/school", "root", "hadoop");

 

        // 寫入"wordcount"表中的數據

        String[] fields = { "word", "number" };

        DBOutputFormat.setOutput(conf, "wordcount", fields);

 

        JobClient.runJob(conf);

    }

}

 

      3)運行結果如下所示

  • Windows環境

  測試數據:

(1)file1.txt

 

hello word

hello hadoop

 

    (2)file2.txt

 

蝦皮 hadoop

蝦皮 word

軟件 軟件

 

      運行結果:

 

      我們發現上圖中出現了"?",后來查找原來是因為我的測試數據時在Windows用記事本寫的然后保存為"UTF-8",在保存時為了區分編碼,自動在前面加了一個"BOM",但是不會顯示任何結果。然而我們的代碼把它識別為"?"進行處理。這就出現了上面的結果,如果我們在每個要處理的文件前面的第一行加一個空格,結果就成如下顯示:

 

      接着又做了一個測試,在Linux上面用下面命令創建了一個文件,並寫上中文內容。結果顯示並沒有出現"?",而且網上說不同的記事本軟件(EmEditor、UE)保存為"UTF-8"就沒有這個問題。經過修改之后的Map類,就能夠正常識別了。

 

    // Map處理過程

    public static class Map extends MapReduceBase implements

            Mapper<Object, Text, Text, IntWritable> {

 

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

 

        @Override

        public void map(Object key, Text value,

            OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            String line = value.toString();

           

            //處理記事本UTF-8的BOM問題

            if (line.getBytes().length > 0) {

                if ((int) line.charAt(0) == 65279) {

                    line = line.substring(1);

                }

            }

           

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                output.collect(word, one);

            }

        }

    }

 

      處理之后的結果:

 

 

      從上圖中得知,我們的問題已經解決了,因此,在編輯、更改任何文本文件時,請務必使用不會亂加BOM的編輯器。Linux下的編輯器應該都沒有這個問題。Windows下,請勿使用記事本等編輯器。推薦的編輯器是: Editplus 2.12版本以上; EmEditor; UltraEdit(需要取消'添加BOM'的相關選項); Dreamweaver(需要取消'添加BOM'的相關選項) 等。

  對於已經添加了BOM的文件,要取消的話,可以用以上編輯器另存一次。(Editplus需要先另存為gb,再另存為UTF-8。) DW解決辦法如下: 用DW打開指定文件,按Ctrl+Jà標題/編碼à編碼選擇"UTF-8",去掉"包括Unicode簽名(BOM)"勾選à保存/另存為,即可。

    國外有一個牛人已經把這個問題解決了,使用"UnicodeInputStream"、"UnicodeReader"。

    地址:http://koti.mbnet.fi/akini/java/unicodereader/

    示例:Java讀帶有BOM的UTF-8文件亂碼原因及解決方法

    代碼:http://download.csdn.net/detail/xia520pi/4146123

 

  • Linux環境

  測試數據:

    (1)file1.txt

 

MapReduce is simple

 

    (2)file2.txt

 

MapReduce is powerful is simple

 

    (3)file2.txt

 

Hello MapReduce bye MapReduce

 

      運行結果:

 

 

      到目前為止,MapReduce與關系型數據庫交互已經結束,從結果中得知,目前新版的API還不能很好的支持關系型數據庫的操作,上面兩個例子都是使用的舊版的API。關於更多的MySQL操作,具體參考"Hadoop集群_第10期副刊_常用MySQL數據庫命令_V1.0"。

 

      本期歷時五天,終於完成,期間遇到的關鍵問題如下:

 

  • MySQL的JDBC的jar存放問題。
  • Win7對MySQL防火牆的設置。
  • Linux中MySQL變更目錄不能啟動。
  • MapReduce處理帶BOM的UTF-8問題。
  • 設置MySQL可以遠程訪問。
  • MySQL處理中文亂碼問題。

 

  從這幾天對MapReduce的了解,發現其實Hadoop對關系型數據庫的處理還不是很強,主要是Hadoop和關系型數據做的事不是同一類型,各有所特長。下面幾期我們將對Hadoop里的HBase和Hive進行全面了解。

 

  文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.10.rar

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM