業務需求說明,由於公司數據中心處於剛開始部署的階段,這需要涉及其它部分將數據全部匯總到數據中心,這實現的方式是同上傳json文件,通過采用socket&serversocket實現傳輸。
其中,服務端采用多線程的方式,實現多用戶傳輸的目的。並且實現可以將數據寫入到hbase中。
具體步驟如下:
1、首先編寫客戶端的代碼:
package com.yiban.datacenter.ToHbaseFromJson;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class hbaseclient {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
// 創建客戶端的socket
Socket s = new Socket("192.168.27.47", 22222);
// 首先確認連接上了我的服務器,通過接受服務器發送的確認信息
BufferedWriter firstclientwrite = new BufferedWriter(
new OutputStreamWriter(s.getOutputStream()));
firstclientwrite.write("我准備向你發送數據了,你准備好接收了嗎?");
firstclientwrite.newLine();
firstclientwrite.flush();
// 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出
BufferedReader testconnection = new BufferedReader(
new InputStreamReader(s.getInputStream()));
// 輸出結果
String sss = testconnection.readLine();
System.out.println(sss);
//發送表名和列族名
firstclientwrite.write("nihao");
firstclientwrite.newLine();
firstclientwrite.flush();
//確定表發送是否成功
System.out.println(testconnection.readLine());
// 封裝客戶端的文本文件
BufferedReader clientread = new BufferedReader(new FileReader(
"file.json"));
// 准備將客戶端的字符流寫入到對應的通道內,OutputStreamWriter是將字符流轉換成字節流,
// BufferedWriter封裝字符流
BufferedWriter clientwirte = new BufferedWriter(
new OutputStreamWriter(s.getOutputStream()));
String line = null;
while ((line = clientread.readLine()) != null) {
clientwirte.write(line);
clientwirte.newLine();
clientwirte.flush();
}
// 提示發送完成
s.shutdownOutput();
// 准備接收一個反饋
// 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出
BufferedReader ctread = new BufferedReader(new InputStreamReader(
s.getInputStream()));
// 輸出結果
String fackcall = null;
while ((fackcall = ctread.readLine()) != null) {
System.out.println(fackcall);
}
// 釋放資源
clientread.close();
s.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、服務端的代碼:
(1)線程類的實現
package com.yiban.datacenter.ToHbaseFromJson;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Map.Entry;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class UserThread implements Runnable {
private String testconnect = "我准備向你發送數據了,你准備好接收了嗎?"; // 這個可以用來驗證用戶名和密碼
private static Configuration conf = HBaseConfiguration.create();
private static Connection connection = null;
// 配置信息
static {
try {
conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
connection = ConnectionFactory.createConnection(conf);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
private Socket s;
public UserThread(Socket s) {
this.s = s;
}
private String userTableName = null;
private String columnFamilyName = null;
@Override
public void run() {
// TODO Auto-generated method stub
try {
// 將通道內的字節流轉換成字符流,並用bufferedreader進行封裝,InputStreamReader是將字節流轉換成字符流
BufferedReader serverread = new BufferedReader(
new InputStreamReader(s.getInputStream()));
// 詢問客戶端連接是否准備好,接受客戶端的連接請求
String line = serverread.readLine(); // 阻塞
System.out.println(line);// 輸出客戶端的連接請求
// 將通道內的字符寫入到對應的文件中,利用bufferedwrite進行封裝,FileWriter是將字符流寫入到文件中
BufferedWriter serverwrite = new BufferedWriter(
new OutputStreamWriter(s.getOutputStream()));
if (line.equals(testconnect)) {
serverwrite.write("連接成功,你可以發送數據了,發送數據前,請先發送你要用的數據庫表名!");
serverwrite.newLine();
serverwrite.flush();
} else {
serverwrite.write("連接失敗!");
serverwrite.newLine();
serverwrite.flush();
}
// 准備接收表名和列族名
userTableName = serverread.readLine();
System.out.println("tablename:" + userTableName);// 輸出客戶端的連接請求
// 告訴客戶端,我接受成功
if (TableIsExist(userTableName)) {
serverwrite.write("接收表名成功");
serverwrite.newLine();
serverwrite.flush();
} else {
serverwrite.write("表不存在");
serverwrite.newLine();
serverwrite.flush();
}
// 循環讀取客戶端的數據
line = "";
StringBuffer temp = new StringBuffer(line);
while ((line = serverread.readLine()) != null) {
temp.append(line);
}
// 對json文件進行解析
JSONArray jsonArray = JSONArray.fromObject(temp.toString());
// 解析之后進行輸出,在這里可以直接寫入到hbase中
PrintJsonArray(jsonArray);
getAllTables(conf);
// 將接收到的數據寫入hbase中的表中
insertData(jsonArray, userTableName);
// 給出一個反饋,提示數據上傳成功
// 封裝通道內的輸出流,方便對他進行寫字符數據
BufferedWriter bwserver = new BufferedWriter(
new OutputStreamWriter(s.getOutputStream()));
bwserver.write("文件上傳成功!");
// bwserver.newLine();
bwserver.flush();
bwserver.close();
// 釋放資源
// serverwrite.close();
s.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@SuppressWarnings("deprecation")
private void insertData(JSONArray jsonArray, String userTableName) {
// TODO Auto-generated method stub
Table table = null;
try {
table = connection.getTable(TableName.valueOf(userTableName));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
List<Put> putlist = new ArrayList<Put>();
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject jsonobject = jsonArray.getJSONObject(i);
Put put = new Put(Bytes.toBytes(jsonobject.getString("DocumentID")));// 指定行,也就是鍵值
// 參數分別:列族、列、值
put.add(Bytes.toBytes("info"),
Bytes.toBytes("DocumentContent"),
Bytes.toBytes(jsonobject.getString("DocumentContent")));
putlist.add(put);
}
try {
table.put(putlist);
table.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private boolean TableIsExist(String userTableName2) {
boolean flag = false;
try {
// Connection connection = ConnectionFactory.createConnection(conf);
Admin ad = connection.getAdmin();
if (ad.tableExists(TableName.valueOf(userTableName2))) {
flag = true;
System.out.println("表存在");
} else {
System.out.println("表不存在");
}
} catch (Exception e) {
// TODO: handle exception
}
return flag;
// TODO Auto-generated method stub
}
private void PrintJsonArray(JSONArray jsonArray) {
int size = jsonArray.size();
System.out.println("Size: " + size);
for (int i = 0; i < size; i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
System.out.println("[" + i + "]id=" + jsonObject.get("id"));
System.out.println("[" + i + "]name=" + jsonObject.get("name"));
System.out.println("[" + i + "]role=" + jsonObject.get("role"));
}
}
// create table
private void createTable(Configuration conf) {
// HBaseAdmin ha=new HBaseAdmin(conf);
try {
// Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf(userTableName));
Admin ad = connection.getAdmin();
// TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
HTableDescriptor desc = new HTableDescriptor(table.getName());
HColumnDescriptor family = new HColumnDescriptor(
Bytes.toBytes(columnFamilyName));// 列簇
desc.addFamily(family);
ad.createTable(desc);
ad.close();
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
// Hbase獲取所有的表信息
public static List getAllTables(Configuration conf)
throws MasterNotRunningException, ZooKeeperConnectionException,
IOException {
HBaseAdmin ad = new HBaseAdmin(conf);
List<String> tables = null;
if (ad != null) {
try {
HTableDescriptor[] allTable = ad.listTables();
if (allTable.length > 0)
tables = new ArrayList<String>();
for (HTableDescriptor hTableDescriptor : allTable) {
tables.add(hTableDescriptor.getNameAsString());
System.out.println(hTableDescriptor.getNameAsString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tables;
}
// 按順序輸出
public void printResult(Result rs) {
if (rs.isEmpty()) {
System.out.println("result is empty!");
return;
}
// new API and print Map of families to all versions of its qualifiers
// and values.
NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
.getMap();
String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
System.out.println("rowkey->" + rowkey);
for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
.entrySet()) {
System.out.print("\tfamily->" + Bytes.toString(temp.getKey()));
for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
.getValue().entrySet()) {
System.out.print("\tcol->" + Bytes.toString(value.getKey()));
for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
System.out.print("\tvesion->" + va.getKey());
System.out.print("\tvalue->"
+ Bytes.toString(va.getValue()));
System.out.println();
}
}
}
}
}
(2)主函數的實現
package com.yiban.datacenter.ToHbaseFromJson;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
public class HbaseServer {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
@SuppressWarnings("resource")
ServerSocket ss=new ServerSocket(22222);
while(true){
Socket s=ss.accept();
new Thread(new UserThread3(s)).start();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
