業務需求說明,由於公司數據中心處於剛開始部署的階段,這需要涉及其它部分將數據全部匯總到數據中心,這實現的方式是同上傳json文件,通過采用socket&serversocket實現傳輸。
其中,服務端采用多線程的方式,實現多用戶傳輸的目的。並且實現可以將數據寫入到hbase中。
具體步驟如下:
1、首先編寫客戶端的代碼:
package com.yiban.datacenter.ToHbaseFromJson; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Socket; import java.net.UnknownHostException; public class hbaseclient { public static void main(String[] args) { // TODO Auto-generated method stub try { // 創建客戶端的socket Socket s = new Socket("192.168.27.47", 22222); // 首先確認連接上了我的服務器,通過接受服務器發送的確認信息 BufferedWriter firstclientwrite = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); firstclientwrite.write("我准備向你發送數據了,你准備好接收了嗎?"); firstclientwrite.newLine(); firstclientwrite.flush(); // 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出 BufferedReader testconnection = new BufferedReader( new InputStreamReader(s.getInputStream())); // 輸出結果 String sss = testconnection.readLine(); System.out.println(sss); //發送表名和列族名 firstclientwrite.write("nihao"); firstclientwrite.newLine(); firstclientwrite.flush(); //確定表發送是否成功 System.out.println(testconnection.readLine()); // 封裝客戶端的文本文件 BufferedReader clientread = new BufferedReader(new FileReader( "file.json")); // 准備將客戶端的字符流寫入到對應的通道內,OutputStreamWriter是將字符流轉換成字節流, // BufferedWriter封裝字符流 BufferedWriter clientwirte = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); String line = null; while ((line = clientread.readLine()) != null) { clientwirte.write(line); clientwirte.newLine(); clientwirte.flush(); } // 提示發送完成 s.shutdownOutput(); // 准備接收一個反饋 // 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出 BufferedReader ctread = new BufferedReader(new InputStreamReader( s.getInputStream())); // 輸出結果 String fackcall = null; while ((fackcall = ctread.readLine()) != null) { System.out.println(fackcall); } // 釋放資源 clientread.close(); s.close(); } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2、服務端的代碼:
(1)線程類的實現
package com.yiban.datacenter.ToHbaseFromJson; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.NavigableMap; import java.util.Map.Entry; import net.sf.json.JSONArray; import net.sf.json.JSONObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; public class UserThread implements Runnable { private String testconnect = "我准備向你發送數據了,你准備好接收了嗎?"; // 這個可以用來驗證用戶名和密碼 private static Configuration conf = HBaseConfiguration.create(); private static Connection connection = null; // 配置信息 static { try { conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233"); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); connection = ConnectionFactory.createConnection(conf); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } private Socket s; public UserThread(Socket s) { this.s = s; } private String userTableName = null; private String columnFamilyName = null; @Override public void run() { // TODO Auto-generated method stub try { // 將通道內的字節流轉換成字符流,並用bufferedreader進行封裝,InputStreamReader是將字節流轉換成字符流 BufferedReader serverread = new BufferedReader( new InputStreamReader(s.getInputStream())); // 詢問客戶端連接是否准備好,接受客戶端的連接請求 String line = serverread.readLine(); // 阻塞 System.out.println(line);// 輸出客戶端的連接請求 // 將通道內的字符寫入到對應的文件中,利用bufferedwrite進行封裝,FileWriter是將字符流寫入到文件中 BufferedWriter serverwrite = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); if (line.equals(testconnect)) { serverwrite.write("連接成功,你可以發送數據了,發送數據前,請先發送你要用的數據庫表名!"); serverwrite.newLine(); serverwrite.flush(); } else { serverwrite.write("連接失敗!"); serverwrite.newLine(); serverwrite.flush(); } // 准備接收表名和列族名 userTableName = serverread.readLine(); System.out.println("tablename:" + userTableName);// 輸出客戶端的連接請求 // 告訴客戶端,我接受成功 if (TableIsExist(userTableName)) { serverwrite.write("接收表名成功"); serverwrite.newLine(); serverwrite.flush(); } else { serverwrite.write("表不存在"); serverwrite.newLine(); serverwrite.flush(); } // 循環讀取客戶端的數據 line = ""; StringBuffer temp = new StringBuffer(line); while ((line = serverread.readLine()) != null) { temp.append(line); } // 對json文件進行解析 JSONArray jsonArray = JSONArray.fromObject(temp.toString()); // 解析之后進行輸出,在這里可以直接寫入到hbase中 PrintJsonArray(jsonArray); getAllTables(conf); // 將接收到的數據寫入hbase中的表中 insertData(jsonArray, userTableName); // 給出一個反饋,提示數據上傳成功 // 封裝通道內的輸出流,方便對他進行寫字符數據 BufferedWriter bwserver = new BufferedWriter( new OutputStreamWriter(s.getOutputStream())); bwserver.write("文件上傳成功!"); // bwserver.newLine(); bwserver.flush(); bwserver.close(); // 釋放資源 // serverwrite.close(); s.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @SuppressWarnings("deprecation") private void insertData(JSONArray jsonArray, String userTableName) { // TODO Auto-generated method stub Table table = null; try { table = connection.getTable(TableName.valueOf(userTableName)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } List<Put> putlist = new ArrayList<Put>(); for (int i = 0; i < jsonArray.size(); i++) { JSONObject jsonobject = jsonArray.getJSONObject(i); Put put = new Put(Bytes.toBytes(jsonobject.getString("DocumentID")));// 指定行,也就是鍵值 // 參數分別:列族、列、值 put.add(Bytes.toBytes("info"), Bytes.toBytes("DocumentContent"), Bytes.toBytes(jsonobject.getString("DocumentContent"))); putlist.add(put); } try { table.put(putlist); table.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private boolean TableIsExist(String userTableName2) { boolean flag = false; try { // Connection connection = ConnectionFactory.createConnection(conf); Admin ad = connection.getAdmin(); if (ad.tableExists(TableName.valueOf(userTableName2))) { flag = true; System.out.println("表存在"); } else { System.out.println("表不存在"); } } catch (Exception e) { // TODO: handle exception } return flag; // TODO Auto-generated method stub } private void PrintJsonArray(JSONArray jsonArray) { int size = jsonArray.size(); System.out.println("Size: " + size); for (int i = 0; i < size; i++) { JSONObject jsonObject = jsonArray.getJSONObject(i); System.out.println("[" + i + "]id=" + jsonObject.get("id")); System.out.println("[" + i + "]name=" + jsonObject.get("name")); System.out.println("[" + i + "]role=" + jsonObject.get("role")); } } // create table private void createTable(Configuration conf) { // HBaseAdmin ha=new HBaseAdmin(conf); try { // Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(userTableName)); Admin ad = connection.getAdmin(); // TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名 HTableDescriptor desc = new HTableDescriptor(table.getName()); HColumnDescriptor family = new HColumnDescriptor( Bytes.toBytes(columnFamilyName));// 列簇 desc.addFamily(family); ad.createTable(desc); ad.close(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } // Hbase獲取所有的表信息 public static List getAllTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin ad = new HBaseAdmin(conf); List<String> tables = null; if (ad != null) { try { HTableDescriptor[] allTable = ad.listTables(); if (allTable.length > 0) tables = new ArrayList<String>(); for (HTableDescriptor hTableDescriptor : allTable) { tables.add(hTableDescriptor.getNameAsString()); System.out.println(hTableDescriptor.getNameAsString()); } } catch (IOException e) { e.printStackTrace(); } } return tables; } // 按順序輸出 public void printResult(Result rs) { if (rs.isEmpty()) { System.out.println("result is empty!"); return; } // new API and print Map of families to all versions of its qualifiers // and values. NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs .getMap(); String rowkey = Bytes.toString(rs.getRow()); // actain rowkey System.out.println("rowkey->" + rowkey); for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps .entrySet()) { System.out.print("\tfamily->" + Bytes.toString(temp.getKey())); for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp .getValue().entrySet()) { System.out.print("\tcol->" + Bytes.toString(value.getKey())); for (Entry<Long, byte[]> va : value.getValue().entrySet()) { System.out.print("\tvesion->" + va.getKey()); System.out.print("\tvalue->" + Bytes.toString(va.getValue())); System.out.println(); } } } } }
(2)主函數的實現
package com.yiban.datacenter.ToHbaseFromJson; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; public class HbaseServer { public static void main(String[] args) { // TODO Auto-generated method stub try { @SuppressWarnings("resource") ServerSocket ss=new ServerSocket(22222); while(true){ Socket s=ss.accept(); new Thread(new UserThread3(s)).start(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }