使用Java API連接和操作HBase數據庫


創建的數據庫存儲如下數據

插入數據示意圖完整版

表結構

image

java代碼

  1 
  2 public class HbaseTest {
  3 
  4 	/**
 5 	 * 配置ss
 6 	 */
  7 	static Configuration config = null;
  8 	private Connection connection = null;
  9 	private Table table = null;
 10 
 11 	@Before
 12 	public void init() throws Exception {
 13 		config = HBaseConfiguration.create();// 配置
 14 		config.set("hbase.zookeeper.quorum", "192.168.33.61");// zookeeper地址
 15 		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
 16 		connection = ConnectionFactory.createConnection(config);
 17 		table = connection.getTable(TableName.valueOf("dept"));
 18 	}
 19 
 20 	/**
 21 	 * 創建數據庫表dept,並增加列族info和subdept
 22 	 *
 23 	 * @throws Exception
 24 	 */
 25 	@Test
 26 	public void createTable() throws Exception {
 27 		// 創建表管理類
 28 		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
 29 		// 創建表描述類
 30 		TableName tableName = TableName.valueOf("dept"); // 表名稱
 31 		HTableDescriptor desc = new HTableDescriptor(tableName);
 32 		// 創建列族的描述類
 33 		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
 34 		// 將列族添加到表中
 35 		desc.addFamily(family);
 36 		HColumnDescriptor family2 = new HColumnDescriptor("subdept"); // 列族
 37 		// 將列族添加到表中
 38 		desc.addFamily(family2);
 39 		// 創建表
 40 		admin.createTable(desc); // 創建表
 41 		System.out.println("創建表成功!");
 42 	}
 43 
 44 	/**
 45 	 * 向hbase中插入前三行網絡部、開發部、測試部的相關數據,
 46 	 * 即加入表中的前三條數據
 47 	 *
 48 	 * @throws Exception
 49 	 */
 50 	@SuppressWarnings({ "deprecation", "resource" })
 51 	@Test
 52 	public void insertData() throws Exception {
 53 		table.setAutoFlushTo(false);
 54 		table.setWriteBufferSize(534534534);
 55 		ArrayList<Put> arrayList = new ArrayList<Put>();
 56 
 57 		Put put = new Put(Bytes.toBytes("0_1"));
 58 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("網絡部"));
 59 		put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept1"), Bytes.toBytes("1_1"));
 60 		put.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept2"), Bytes.toBytes("1_2"));
 61 		arrayList.add(put);
 62 
 63 		Put put1 = new Put(Bytes.toBytes("1_1"));
 64 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部"));
 65 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
 66 
 67 		Put put2 = new Put(Bytes.toBytes("1_2"));
 68 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試部"));
 69 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
 70 
 71 		for (int i = 1; i <= 100; i++) {
 72 
 73 			put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("2_"+i));
 74 			put2.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept"+i), Bytes.toBytes("3_"+i));
 75 		}
 76 		arrayList.add(put1);
 77 		arrayList.add(put2);
 78 		//插入數據
 79 		table.put(arrayList);
 80 		//提交
 81 		table.flushCommits();
 82 		System.out.println("數據插入成功!");
 83 	}
 84 
 85 	/**
 86 	 * 向hbase中插入開發部、測試部下的所有子部門數據
 87 	 * @throws Exception
 88 	 */
 89 	@Test
 90 	public void insertOtherData() throws Exception {
 91 		table.setAutoFlushTo(false);
 92 		table.setWriteBufferSize(534534534);
 93 		ArrayList<Put> arrayList = new ArrayList<Put>();
 94 		for (int i = 1; i <= 100; i++) {
 95 			Put put_development = new Put(Bytes.toBytes("2_"+i));
 96 			put_development.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發"+i+""));
 97 			put_development.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_1"));
 98 			arrayList.add(put_development);
 99 
100 			Put put_test = new Put(Bytes.toBytes("3_"+i));
101 			put_test.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("測試"+i+""));
102 			put_test.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("1_2"));
103 			arrayList.add(put_test);
104 		}
105 
106 		//插入數據
107 		table.put(arrayList);
108 		//提交
109 		table.flushCommits();
110 		System.out.println("插入其他數據成功!");
111 	}
112 
113 	/**
114 	 * 查詢所有一級部門(沒有上級部門的部門)
115 	 * @throws Exception
116 	 */
117 	@Test
118 	public void scanDataStep1() throws Exception {
119 
120 		// 創建全表掃描的scan
121 		Scan scan = new Scan();
122 		System.out.println("查詢到的所有一級部門如下:");
123 		// 打印結果集
124 		ResultScanner scanner = table.getScanner(scan);
125 		for (Result result : scanner) {
126 			if (result.getValue(Bytes.toBytes("info"), Bytes.toBytes("f_pid")) == null) {
127 				for (KeyValue kv : result.raw()) {
128 				     System.out.print(new String(kv.getRow()) + " ");
129 				     System.out.print(new String(kv.getFamily()) + ":");
130 				     System.out.print(new String(kv.getQualifier()) + " = ");
131 				     System.out.print(new String(kv.getValue()));
132 				     System.out.print(" timestamp = " + kv.getTimestamp() + "\n");
133 				}
134 		}
135 		}
136 	}
137 
138 	/**
139 	 * 已知rowkey,查詢該部門的所有(直接)子部門信息 rowkey=1_1
140 	 * @throws Exception
141 	 */
142 	@Test
143 	public void scanDataStep2() throws Exception {
144 		Get g = new Get("1_1".getBytes());
145 		g.addFamily("subdept".getBytes());
146 		// 打印結果集
147 		Result result = table.get(g);
148 		for (KeyValue kv : result.raw()) {
149 		     Get g1 = new Get(kv.getValue());
150 		     Result result1 = table.get(g1);
151 		     for (KeyValue kv1 : result1.raw()) {
152 			     System.out.print(new String(kv1.getRow()) + " ");
153 			     System.out.print(new String(kv1.getFamily()) + ":");
154 			     System.out.print(new String(kv1.getQualifier()) + " = ");
155 			     System.out.print(new String(kv1.getValue()));
156 			     System.out.print(" timestamp = " + kv1.getTimestamp() + "\n");
157 			}
158 		}
159 	}
160 
161 	/**
162 	 * 已知rowkey,向該部門增加一個子部門
163 	 * rowkey:0_1
164 	 * 增加的部門名:我增加的部門
165 	 * @throws Exception
166 	 */
167 	@Test
168 	public void scanDataStep3() throws Exception {
169 		//新增一個部門
170 		Put put = new Put(Bytes.toBytes("4_1"));
171 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("我增加的部門"));
172 		put.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("0_1"));
173 		//插入數據
174 		table.put(put);
175 		//提交
176 		table.flushCommits();
177 
178 		//更新網絡部
179 		Put put1 = new Put(Bytes.toBytes("0_1"));
180 		put1.add(Bytes.toBytes("subdept"), Bytes.toBytes("subdept3"), Bytes.toBytes("4_1"));
181 		//插入數據
182 		table.put(put1);
183 		//提交
184 		table.flushCommits();
185 	}
186 
187 	/**
188 	 * 已知rowkey(且該部門存在子部門),刪除該部門信息,該部門所有(直接)子部門被調整到其他部門中
189 	 * @throws Exception
190 	 */
191 	@Test
192 	public void scanDataStep4() throws Exception {
193 		/**
194 		 * 向部門"我增加的部門"添加兩個子部門"
195 		 */
196 		table.setAutoFlushTo(false);
197 		table.setWriteBufferSize(534534534);
198 		ArrayList<Put> arrayList = new ArrayList<Put>();
199 		Put put1 = new Put(Bytes.toBytes("5_1"));
200 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門1"));
201 		put1.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
202 		Put put2 = new Put(Bytes.toBytes("5_2"));
203 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("新增子部門2"));
204 		put2.add(Bytes.toBytes("info"), Bytes.toBytes("f_pid"), Bytes.toBytes("4_1"));
205 
206 		arrayList.add(put1);
207 		arrayList.add(put2);
208 		//插入數據
209 		table.put(arrayList);
210 		//提交
211 		table.flushCommits();
212 
213 		/**
214 		 * 目的:刪除"我增加的部門"的部門信息,該部門所有(直接)子部門被調整到其他部門中
215 		 * 使用策略:更新部門名就可以了,也就是說一個部門可能有多個rowkey
216 		 */
217 		Put put = new Put(Bytes.toBytes("4_1"));
218 		put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("開發部"));
219 		//插入數據
220 		table.put(put);
221 		//提交
222 		table.flushCommits();
223 	}
224 
225 	@After
226 	public void close() throws Exception {
227 		table.close();
228 		connection.close();
229 	}
230 
231 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM