用java代碼調用shell腳本執行sqoop將hive表中數據導出到mysql


1:創建shell腳本 

1 touch sqoop_options.sh
2 chmod 777 sqoop_options.sh

編輯文件  特地將執行map的個數設置為變量  測試 可以java代碼傳參數 同時也驗證sqoop的 options 屬性支持這種寫法

1 #!/bin/bash
2 /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/sqoop --options-file /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop-import-mysql.txt --num-mappers $1
3 if [ $? -eq 0 ];then
4  echo "success"
5 else
6  echo "error"
7 fi

2:創建  sqoop-import-mysql.txt 文件並編輯

touch sqoop-import-mysql.txt
 1 export
 2 --connect
 3 jdbc:mysql://172.16.71.27:3306/babasport
 4 --username
 5 root
 6 --password
 7 root
 8 --table
 9 test_hive
10 --export-dir
11 /user/hive/warehouse/hive_bbs_product_snappy
12 --input-fields-terminated-by
13 '\t'

 

hive數據存在hdfs位置

 

3:開始寫java后台代碼   目前只支持 window寫法 后期加上linux調用shell腳本的寫法

 1 package com.liveyc.common.utils;
 2 
 3 import java.util.Properties;
 4 
 5 import org.apache.commons.logging.Log;
 6 import org.apache.commons.logging.LogFactory;
 7 
 8 public  class FileToHbase {  
 9     /**
10      * shell腳本執行成功標識
11      */
12     public static int SHELL_EXIT_OK = 0;
13     public static Log log = LogFactory.getLog(FileToHbase.class);
14     public static String connIp = "172.16.71.120";
15     public static String connUser = "root";
16     public static String connPwd = "123456";
17     
18     public static void main(String[] args) throws Exception {
19         boolean result = export();
20         System.out.println(result);
21     }
22     
23     public static boolean export() throws Exception {
24         boolean result = false;
25         // 如果當前系統是window系統需要遠程ssh連接系統
26         if (isWinSystem()) {
27             ConnectShell connectShell = new ConnectShell(connIp, connUser, connPwd, "utf-8");
28             String url = "/opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop_options.sh" + " " +1;
29             result = connectShell.excuteShellCommand(url);
30         } 
31         return result;
32     }
33 
34     /**
35      * 當前操作系統類型
36      * 
37      * @return true 為windos系統,false為linux系統
38      */
39     public static boolean isWinSystem() {
40         // 獲取當前操作系統類型
41         Properties prop = System.getProperties();
42         String os = prop.getProperty("os.name");
43         if (os.startsWith("win") || os.startsWith("Win")) {
44             return true;
45         } else {
46             return false;
47         }
48     }
49 }  

 

  1 package com.liveyc.common.utils;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.IOException;
  5 import java.io.InputStream;
  6 import java.io.InputStreamReader;
  7 import java.io.UnsupportedEncodingException;
  8 import java.nio.charset.Charset;
  9 
 10 import org.apache.commons.logging.Log;
 11 import org.apache.commons.logging.LogFactory;
 12 
 13 import ch.ethz.ssh2.ChannelCondition;
 14 import ch.ethz.ssh2.Connection;
 15 import ch.ethz.ssh2.Session;
 16 import ch.ethz.ssh2.StreamGobbler;
 17 
 18 /**
 19  * 
 20  * ConnectShell
 21  * 
 22  * @Description:連接Shell腳本所在服務器
 23  * @author:aitf
 24  * @date: 2016年3月31日
 25  *
 26  */
 27 public class ConnectShell {
 28     private Connection conn;
 29     private String ipAddr;
 30     private String userName;
 31     private String password;
 32     private String charset = Charset.defaultCharset().toString();
 33     private static final int TIME_OUT = 1000 * 5 * 60;
 34     public static Log log = LogFactory.getLog(ConnectShell.class);
 35 
 36     public ConnectShell(String ipAddr, String userName, String password, String charset) {
 37         this.ipAddr = ipAddr;
 38         this.userName = userName;
 39         this.password = password;
 40         if (charset != null) {
 41             this.charset = charset;
 42         }
 43     }
 44 
 45     public boolean login() throws IOException {
 46         conn = new Connection(ipAddr);
 47         conn.connect();
 48         return conn.authenticateWithPassword(userName, password); // 認證
 49     }
 50 
 51     /**
 52      * 
 53      * @Title: excuteShellCommand
 54      * @Description: 執行shell腳本命令
 55      * @param shellpath
 56      * @return
 57      */
 58     public boolean excuteShellCommand(String shellpath) {
 59         InputStream in = null;
 60         boolean result = false;
 61         String str = "";
 62         try {
 63             if (this.login()) {
 64                 Session session = conn.openSession();
 65                  //session.execCommand("cd /root");
 66                 session.execCommand(shellpath);
 67                 in = new StreamGobbler(session.getStdout());
 68                 // in = session.getStdout();
 69                  str = this.processStdout(in, charset);
 70                 session.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT);
 71                 session.close();
 72                 conn.close();  
 73                 if (str.contains("success")) {
 74                     result = true;
 75                 }else{
 76                     result = false;
 77                 }
 78             } 
 79         } catch (IOException e1) {
 80             e1.printStackTrace();
 81         }
 82         return result;
 83     }
 84 
 85     public String excuteShellCommand2(String shellpath) throws Exception {
 86         InputStream in = null;
 87         String result = "";
 88         try {
 89             if (this.login()) {
 90                 Process exec = Runtime.getRuntime().exec(shellpath);// ipconfig
 91                 in = exec.getInputStream();
 92                 result = this.processStdout(in, this.charset);
 93             }
 94         } catch (IOException e1) {
 95             e1.printStackTrace();
 96         }
 97         return result;
 98     }
 99 
100     /**
101      * 轉化結果
102      * 
103      * @param in
104      * @param charset
105      * @return
106      * @throws UnsupportedEncodingException
107      */
108     public String processStdout(InputStream in, String charset) throws UnsupportedEncodingException {
109         String line = null;
110         BufferedReader brs = new BufferedReader(new InputStreamReader(in, charset));
111         StringBuffer sb = new StringBuffer();
112         try {
113             while ((line = brs.readLine()) != null) {
114                 sb.append(line + "\n");
115             }
116         } catch (IOException e) {
117             log.error("---轉化出現異常---");
118         }
119         return sb.toString();
120     }
121 
122 }

 

4:開始測試

在mysql創建一個表  hive中數據格式 是  int int String

1 CREATE TABLE test_hive(
2 id INT,
3 brand_id INT,
4 NAME VARCHAR(200)
5 )

 

 執行java main方法 開始測試

觀看8088端口 查看MapReduce的運行狀況 發現正在運行(開心)

 

 執行完畢  

 

 可以看到 只有1個 MapReduce任務 (默認的個數是4個 這樣看來第一步寫的shell腳本 參數是傳遞過來了 sqoop的 options 也支持這種直接指定參數的寫法)

現在轉過來看java代碼

返回值 :

1 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hbase does not exist! HBase imports will fail.
2 Please set $HBASE_HOME to the root of your HBase installation.
3 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hcatalog does not exist! HCatalog jobs will fail.
4 Please set $HCAT_HOME to the root of your HCatalog installation.
5 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../accumulo does not exist! Accumulo imports will fail.
6 Please set $ACCUMULO_HOME to the root of your Accumulo installation.
7 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../zookeeper does not exist! Accumulo imports will fail.
8 Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
9 success

 

 

 發現返回 success 說明shell腳本執行成功了  

 

一切執行正常   看下mysql 數據庫表中有沒有數據

 

 

 

 OK 一切正常 , 后期把linux執行shell腳本的語句也補充上 。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM