1:創建shell腳本
1 touch sqoop_options.sh 2 chmod 777 sqoop_options.sh
編輯文件 特地將執行map的個數設置為變量 測試 可以java代碼傳參數 同時也驗證sqoop的 options 屬性支持這種寫法
1 #!/bin/bash 2 /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/sqoop --options-file /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop-import-mysql.txt --num-mappers $1 3 if [ $? -eq 0 ];then 4 echo "success" 5 else 6 echo "error" 7 fi
2:創建 sqoop-import-mysql.txt 文件並編輯
touch sqoop-import-mysql.txt
1 export 2 --connect 3 jdbc:mysql://172.16.71.27:3306/babasport 4 --username 5 root 6 --password 7 root 8 --table 9 test_hive 10 --export-dir 11 /user/hive/warehouse/hive_bbs_product_snappy 12 --input-fields-terminated-by 13 '\t'
hive數據存在hdfs位置
3:開始寫java后台代碼 目前只支持 window寫法 后期加上linux調用shell腳本的寫法
1 package com.liveyc.common.utils; 2 3 import java.util.Properties; 4 5 import org.apache.commons.logging.Log; 6 import org.apache.commons.logging.LogFactory; 7 8 public class FileToHbase { 9 /** 10 * shell腳本執行成功標識 11 */ 12 public static int SHELL_EXIT_OK = 0; 13 public static Log log = LogFactory.getLog(FileToHbase.class); 14 public static String connIp = "172.16.71.120"; 15 public static String connUser = "root"; 16 public static String connPwd = "123456"; 17 18 public static void main(String[] args) throws Exception { 19 boolean result = export(); 20 System.out.println(result); 21 } 22 23 public static boolean export() throws Exception { 24 boolean result = false; 25 // 如果當前系統是window系統需要遠程ssh連接系統 26 if (isWinSystem()) { 27 ConnectShell connectShell = new ConnectShell(connIp, connUser, connPwd, "utf-8"); 28 String url = "/opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop_options.sh" + " " +1; 29 result = connectShell.excuteShellCommand(url); 30 } 31 return result; 32 } 33 34 /** 35 * 當前操作系統類型 36 * 37 * @return true 為windos系統,false為linux系統 38 */ 39 public static boolean isWinSystem() { 40 // 獲取當前操作系統類型 41 Properties prop = System.getProperties(); 42 String os = prop.getProperty("os.name"); 43 if (os.startsWith("win") || os.startsWith("Win")) { 44 return true; 45 } else { 46 return false; 47 } 48 } 49 }
1 package com.liveyc.common.utils; 2 3 import java.io.BufferedReader; 4 import java.io.IOException; 5 import java.io.InputStream; 6 import java.io.InputStreamReader; 7 import java.io.UnsupportedEncodingException; 8 import java.nio.charset.Charset; 9 10 import org.apache.commons.logging.Log; 11 import org.apache.commons.logging.LogFactory; 12 13 import ch.ethz.ssh2.ChannelCondition; 14 import ch.ethz.ssh2.Connection; 15 import ch.ethz.ssh2.Session; 16 import ch.ethz.ssh2.StreamGobbler; 17 18 /** 19 * 20 * ConnectShell 21 * 22 * @Description:連接Shell腳本所在服務器 23 * @author:aitf 24 * @date: 2016年3月31日 25 * 26 */ 27 public class ConnectShell { 28 private Connection conn; 29 private String ipAddr; 30 private String userName; 31 private String password; 32 private String charset = Charset.defaultCharset().toString(); 33 private static final int TIME_OUT = 1000 * 5 * 60; 34 public static Log log = LogFactory.getLog(ConnectShell.class); 35 36 public ConnectShell(String ipAddr, String userName, String password, String charset) { 37 this.ipAddr = ipAddr; 38 this.userName = userName; 39 this.password = password; 40 if (charset != null) { 41 this.charset = charset; 42 } 43 } 44 45 public boolean login() throws IOException { 46 conn = new Connection(ipAddr); 47 conn.connect(); 48 return conn.authenticateWithPassword(userName, password); // 認證 49 } 50 51 /** 52 * 53 * @Title: excuteShellCommand 54 * @Description: 執行shell腳本命令 55 * @param shellpath 56 * @return 57 */ 58 public boolean excuteShellCommand(String shellpath) { 59 InputStream in = null; 60 boolean result = false; 61 String str = ""; 62 try { 63 if (this.login()) { 64 Session session = conn.openSession(); 65 //session.execCommand("cd /root"); 66 session.execCommand(shellpath); 67 in = new StreamGobbler(session.getStdout()); 68 // in = session.getStdout(); 69 str = this.processStdout(in, charset); 70 session.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT); 71 session.close(); 72 conn.close(); 73 if (str.contains("success")) { 74 result = true; 75 }else{ 76 result = false; 77 } 78 } 79 } catch (IOException e1) { 80 e1.printStackTrace(); 81 } 82 return result; 83 } 84 85 public String excuteShellCommand2(String shellpath) throws Exception { 86 InputStream in = null; 87 String result = ""; 88 try { 89 if (this.login()) { 90 Process exec = Runtime.getRuntime().exec(shellpath);// ipconfig 91 in = exec.getInputStream(); 92 result = this.processStdout(in, this.charset); 93 } 94 } catch (IOException e1) { 95 e1.printStackTrace(); 96 } 97 return result; 98 } 99 100 /** 101 * 轉化結果 102 * 103 * @param in 104 * @param charset 105 * @return 106 * @throws UnsupportedEncodingException 107 */ 108 public String processStdout(InputStream in, String charset) throws UnsupportedEncodingException { 109 String line = null; 110 BufferedReader brs = new BufferedReader(new InputStreamReader(in, charset)); 111 StringBuffer sb = new StringBuffer(); 112 try { 113 while ((line = brs.readLine()) != null) { 114 sb.append(line + "\n"); 115 } 116 } catch (IOException e) { 117 log.error("---轉化出現異常---"); 118 } 119 return sb.toString(); 120 } 121 122 }
4:開始測試
在mysql創建一個表 hive中數據格式 是 int int String
1 CREATE TABLE test_hive( 2 id INT, 3 brand_id INT, 4 NAME VARCHAR(200) 5 )
執行java main方法 開始測試
觀看8088端口 查看MapReduce的運行狀況 發現正在運行(開心)
執行完畢
可以看到 只有1個 MapReduce任務 (默認的個數是4個 這樣看來第一步寫的shell腳本 參數是傳遞過來了 sqoop的 options 也支持這種直接指定參數的寫法)
現在轉過來看java代碼
返回值 :
1 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hbase does not exist! HBase imports will fail. 2 Please set $HBASE_HOME to the root of your HBase installation. 3 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hcatalog does not exist! HCatalog jobs will fail. 4 Please set $HCAT_HOME to the root of your HCatalog installation. 5 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../accumulo does not exist! Accumulo imports will fail. 6 Please set $ACCUMULO_HOME to the root of your Accumulo installation. 7 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../zookeeper does not exist! Accumulo imports will fail. 8 Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation. 9 success
發現返回 success 說明shell腳本執行成功了
一切執行正常 看下mysql 數據庫表中有沒有數據
OK 一切正常 , 后期把linux執行shell腳本的語句也補充上 。