簡單通過java的socket&serversocket以及多線程技術實現多客戶端的數據的傳輸,並將數據寫入hbase中


業務需求說明,由於公司數據中心處於剛開始部署的階段,這需要涉及其它部分將數據全部匯總到數據中心,這實現的方式是同上傳json文件,通過采用socket&serversocket實現傳輸。

其中,服務端采用多線程的方式,實現多用戶傳輸的目的。並且實現可以將數據寫入到hbase中。

具體步驟如下:

1、首先編寫客戶端的代碼:

package com.yiban.datacenter.ToHbaseFromJson;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class hbaseclient {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		try {
			// 創建客戶端的socket
			Socket s = new Socket("192.168.27.47", 22222);

			// 首先確認連接上了我的服務器,通過接受服務器發送的確認信息
			BufferedWriter firstclientwrite = new BufferedWriter(
					new OutputStreamWriter(s.getOutputStream()));
			firstclientwrite.write("我准備向你發送數據了,你准備好接收了嗎?");
			firstclientwrite.newLine();
			firstclientwrite.flush();

			// 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出
			BufferedReader testconnection = new BufferedReader(
					new InputStreamReader(s.getInputStream()));
			// 輸出結果
			String sss = testconnection.readLine();
			System.out.println(sss);
			
			//發送表名和列族名
			firstclientwrite.write("nihao");
			firstclientwrite.newLine();
			firstclientwrite.flush();
			
			//確定表發送是否成功
			System.out.println(testconnection.readLine());

			// 封裝客戶端的文本文件
			BufferedReader clientread = new BufferedReader(new FileReader(
					"file.json"));

			// 准備將客戶端的字符流寫入到對應的通道內,OutputStreamWriter是將字符流轉換成字節流,
			// BufferedWriter封裝字符流
			BufferedWriter clientwirte = new BufferedWriter(
					new OutputStreamWriter(s.getOutputStream()));

			String line = null;
			while ((line = clientread.readLine()) != null) {
				clientwirte.write(line);
				clientwirte.newLine();
				clientwirte.flush();
			}

			// 提示發送完成
			s.shutdownOutput();

			// 准備接收一個反饋

			// 經通道內的字節輸入流進行一個封裝程字符流,方便直接輸出
			BufferedReader ctread = new BufferedReader(new InputStreamReader(
					s.getInputStream()));

			// 輸出結果
			String fackcall = null;
			while ((fackcall = ctread.readLine()) != null) {
				System.out.println(fackcall);
			}

			// 釋放資源
			clientread.close();
			s.close();

		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

2、服務端的代碼:

(1)線程類的實現

package com.yiban.datacenter.ToHbaseFromJson;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Map.Entry;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class UserThread implements Runnable {

	private String testconnect = "我准備向你發送數據了,你准備好接收了嗎?"; // 這個可以用來驗證用戶名和密碼

	private static Configuration conf = HBaseConfiguration.create();

	private static Connection connection = null;
	// 配置信息
	static {
		try {
			conf.set(HConstants.ZOOKEEPER_QUORUM, "192.168.27.233");
			conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
			connection = ConnectionFactory.createConnection(conf);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

	private Socket s;

	public UserThread(Socket s) {
		this.s = s;
	}

	private String userTableName = null;
	private String columnFamilyName = null;

	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			// 將通道內的字節流轉換成字符流,並用bufferedreader進行封裝,InputStreamReader是將字節流轉換成字符流
			BufferedReader serverread = new BufferedReader(
					new InputStreamReader(s.getInputStream()));

			// 詢問客戶端連接是否准備好,接受客戶端的連接請求
			String line = serverread.readLine(); // 阻塞
			System.out.println(line);// 輸出客戶端的連接請求

			// 將通道內的字符寫入到對應的文件中,利用bufferedwrite進行封裝,FileWriter是將字符流寫入到文件中
			BufferedWriter serverwrite = new BufferedWriter(
					new OutputStreamWriter(s.getOutputStream()));

			if (line.equals(testconnect)) {
				serverwrite.write("連接成功,你可以發送數據了,發送數據前,請先發送你要用的數據庫表名!");
				serverwrite.newLine();
				serverwrite.flush();
			} else {
				serverwrite.write("連接失敗!");
				serverwrite.newLine();
				serverwrite.flush();
			}

			// 准備接收表名和列族名
			userTableName = serverread.readLine();
			System.out.println("tablename:" + userTableName);// 輸出客戶端的連接請求

			// 告訴客戶端,我接受成功
			if (TableIsExist(userTableName)) {
				serverwrite.write("接收表名成功");
				serverwrite.newLine();
				serverwrite.flush();
			} else {
				serverwrite.write("表不存在");
				serverwrite.newLine();
				serverwrite.flush();
			}

			// 循環讀取客戶端的數據
			line = "";
			StringBuffer temp = new StringBuffer(line);
			while ((line = serverread.readLine()) != null) {
				temp.append(line);
			}

			// 對json文件進行解析
			JSONArray jsonArray = JSONArray.fromObject(temp.toString());

			// 解析之后進行輸出,在這里可以直接寫入到hbase中
			PrintJsonArray(jsonArray);
			getAllTables(conf);

			// 將接收到的數據寫入hbase中的表中
			insertData(jsonArray, userTableName);

			// 給出一個反饋,提示數據上傳成功
			// 封裝通道內的輸出流,方便對他進行寫字符數據
			BufferedWriter bwserver = new BufferedWriter(
					new OutputStreamWriter(s.getOutputStream()));

			bwserver.write("文件上傳成功!");
			// bwserver.newLine();
			bwserver.flush();
			bwserver.close();

			// 釋放資源
			// serverwrite.close();
			s.close();

		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@SuppressWarnings("deprecation")
	private void insertData(JSONArray jsonArray, String userTableName) {
		// TODO Auto-generated method stub
		Table table = null;
		try {
			table = connection.getTable(TableName.valueOf(userTableName));
		} catch (IOException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		List<Put> putlist = new ArrayList<Put>();
		for (int i = 0; i < jsonArray.size(); i++) {
			JSONObject jsonobject = jsonArray.getJSONObject(i);

			Put put = new Put(Bytes.toBytes(jsonobject.getString("DocumentID")));// 指定行,也就是鍵值
			// 參數分別:列族、列、值
			
			put.add(Bytes.toBytes("info"),
					Bytes.toBytes("DocumentContent"),
					Bytes.toBytes(jsonobject.getString("DocumentContent")));
			putlist.add(put);
		}

		try {
			table.put(putlist);
			table.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	private boolean TableIsExist(String userTableName2) {
		boolean flag = false;
		try {
			// Connection connection = ConnectionFactory.createConnection(conf);
			Admin ad = connection.getAdmin();
			if (ad.tableExists(TableName.valueOf(userTableName2))) {
				flag = true;
				System.out.println("表存在");
			} else {
				System.out.println("表不存在");
			}
		} catch (Exception e) {
			// TODO: handle exception
		}

		return flag;
		// TODO Auto-generated method stub

	}

	private void PrintJsonArray(JSONArray jsonArray) {
		int size = jsonArray.size();
		System.out.println("Size: " + size);
		for (int i = 0; i < size; i++) {
			JSONObject jsonObject = jsonArray.getJSONObject(i);
			System.out.println("[" + i + "]id=" + jsonObject.get("id"));
			System.out.println("[" + i + "]name=" + jsonObject.get("name"));
			System.out.println("[" + i + "]role=" + jsonObject.get("role"));
		}
	}

	// create table
	private void createTable(Configuration conf) {
		// HBaseAdmin ha=new HBaseAdmin(conf);
		try {
			// Connection connection = ConnectionFactory.createConnection(conf);
			Table table = connection.getTable(TableName.valueOf(userTableName));
			Admin ad = connection.getAdmin();

			// TableName name= TableName.valueOf(Bytes.toBytes(tablename));//表名
			HTableDescriptor desc = new HTableDescriptor(table.getName());

			HColumnDescriptor family = new HColumnDescriptor(
					Bytes.toBytes(columnFamilyName));// 列簇
			desc.addFamily(family);

			ad.createTable(desc);
			ad.close();

		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}

	}

	// Hbase獲取所有的表信息
	public static List getAllTables(Configuration conf)
			throws MasterNotRunningException, ZooKeeperConnectionException,
			IOException {

		HBaseAdmin ad = new HBaseAdmin(conf);
		List<String> tables = null;
		if (ad != null) {
			try {
				HTableDescriptor[] allTable = ad.listTables();
				if (allTable.length > 0)
					tables = new ArrayList<String>();
				for (HTableDescriptor hTableDescriptor : allTable) {
					tables.add(hTableDescriptor.getNameAsString());
					System.out.println(hTableDescriptor.getNameAsString());
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return tables;
	}

	// 按順序輸出
	public void printResult(Result rs) {
		if (rs.isEmpty()) {
			System.out.println("result is empty!");
			return;
		}
		// new API and print Map of families to all versions of its qualifiers
		// and values.
		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temps = rs
				.getMap();
		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
		System.out.println("rowkey->" + rowkey);
		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> temp : temps
				.entrySet()) {
			System.out.print("\tfamily->" + Bytes.toString(temp.getKey()));
			for (Entry<byte[], NavigableMap<Long, byte[]>> value : temp
					.getValue().entrySet()) {
				System.out.print("\tcol->" + Bytes.toString(value.getKey()));
				for (Entry<Long, byte[]> va : value.getValue().entrySet()) {
					System.out.print("\tvesion->" + va.getKey());
					System.out.print("\tvalue->"
							+ Bytes.toString(va.getValue()));
					System.out.println();
				}
			}
		}
	}
	
}

 (2)主函數的實現

package com.yiban.datacenter.ToHbaseFromJson;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;

public class HbaseServer {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
		try {
			@SuppressWarnings("resource")
			ServerSocket ss=new ServerSocket(22222);
			
			while(true){
				Socket s=ss.accept();
				
				new Thread(new UserThread3(s)).start();
			}
			
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM