一. 需求分析
1) 微博內容的瀏覽,數據庫表設計
2) 用戶社交體現:關注用戶,取關用戶
3) 拉取關注的人的微博內容
二. 代碼實現
代碼設計總覽:
1.創建命名空間以及表名的定義
//獲取配置 conf private Configuration conf = HBaseConfiguration.create(); //微博內容表的表名 private static final byte[] TABLE_CONTENT = Bytes.toBytes("ns_weibo:content"); //用戶關系表的表名 private static final byte[] TABLE_RELATION = Bytes.toBytes("ns_weibo:relation"); //微博收件箱表的表名 private static final byte[] TABLE_INBOX = Bytes.toBytes("ns_weibo:inbox"); /** *初始化命名空間 *@param args */ public void initNamespace(){ HBaseAdmin admin = null; try { Connection connection = ConnectionFactory.createConnection(conf); admin = (HBaseAdmin) connection.getAdmin(); //命名空間類似於關系型數據庫中的 schema,可以想象成文件夾 NamespaceDescriptor weibo = NamespaceDescriptor .create("ns_weibo") .addConfiguration("creator", "Jinji") .addConfiguration("create_time", System.currentTimeMillis() + "") .build(); admin.createNamespace(weibo); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null != admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); }
2. 創建微博內容表
表結構
方法名 |
creatTableeContent |
Table Name |
ns_weibo:content |
RowKey |
用戶 ID_時間戳 |
ColumnFamily |
info |
ColumnLabel |
標題,內容,圖片 |
Version |
1 個版本 |
代碼
/** *創建微博內容表 *Table Name:ns_weibo:content *RowKey:用戶 ID_時間戳 *ColumnFamily:info *ColumnLabel:標題,內容,圖片 URL *Version:1 個版本 */ public void createTableContent(){ HBaseAdmin admin = null; Connection connection = null; try { connection = ConnectionFactory.createConnection(conf); admin = (HBaseAdmin) connection.getAdmin(); //創建表表述 HTableDescriptor contentTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT)); //創建列族描述 HColumnDescriptor infoColumnDescriptor = new HColumnDescriptor(Bytes.toBytes("info")); //設置塊緩存 infoColumnDescriptor.setBlockCacheEnabled(true); //設置塊緩存大小 infoColumnDescriptor.setBlocksize(2097152); //設置壓縮方式 // infoColumnDescriptor.setCompressionType(Algorithm.SNAPPY); //設置版本確界 infoColumnDescriptor.setMaxVersions(1); infoColumnDescriptor.setMinVersions(1); contentTableDescriptor.addFamily(infoColumnDescriptor); admin.createTable(contentTableDescriptor); } catch (IOException e) { e.printStackTrace(); } finally{ if(null != admin){ try { admin.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
3.創建用戶關系表
表結構
方法名 |
createTableRelations |
Table Name |
ns_weibo:relation |
RowKey |
用戶 ID |
ColumnFamily |
attends、fans |
ColumnLabel |
關注用戶 ID,粉絲用戶 ID |
ColumnValue |
用戶 ID |
Version |
1 個版本 |
代碼:
/** *用戶關系表 *Table Name:ns_weibo:relation *RowKey:用戶 ID *ColumnFamily:attends,fans *ColumnLabel:關注用戶 ID,粉絲用戶 ID *ColumnValue:用戶 ID *Version:1 個版本 */ public void createTableRelation(){ HBaseAdmin admin = null; try { Connection connection = ConnectionFactory.createConnection(conf); admin = (HBaseAdmin) connection.getAdmin(); HTableDescriptor relationTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_RELATION)); //關注的人的列族 HColumnDescriptor attendColumnDescriptor = new HColumnDescriptor(Bytes.toBytes("attends")); //設置塊緩存 attendColumnDescriptor.setBlockCacheEnabled(true); //設置塊緩存大小 attendColumnDescriptor.setBlocksize(2097152); //設置壓縮方式 //attendColumnDescriptor.setCompressionType(Algorithm.SNAPPY); //設置版本確界 attendColumnDescriptor.setMaxVersions(1); attendColumnDescriptor.setMinVersions(1); //粉絲列族 HColumnDescriptor fansColumnDescriptor = new HColumnDescriptor(Bytes.toBytes("fans"));
fansColumnDescriptor.setBlockCacheEnabled(true);
fansColumnDescriptor.setBlocksize(2097152);
fansColumnDescriptor.setMaxVersions(1);
fansColumnDescriptor.setMinVersions(1); relationTableDescriptor.addFamily(attendColumnDescriptor);
relationTableDescriptor.addFamily(fansColumnDescriptor);
admin.createTable(relationTableDescriptor); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null != admin){ try { admin.close(); } catch (IOException e)
{ e.printStackTrace(); }
}
4.創建用戶微博內容接收郵件表
表結構:
方法名 |
createTableInbox |
Table Name |
ns_weibo:inbox |
RowKey |
用戶 ID |
ColumnFamily |
info |
ColumnLabel |
用戶 ID |
ColumnValue |
取微博內容的 RowKey |
Version |
1000 |
代碼:
/** *創建微博收件箱表 *Table Name: ns_weibo:inbox *RowKey:用戶 ID *ColumnFamily:info *ColumnLabel:用戶 ID_發布微博的人的用戶 ID *ColumnValue:關注的人的微博的 RowKey *Version:1000 */ public void createTableInbox(){ HBaseAdmin admin = null; try { Connection connection = ConnectionFactory.createConnection(conf); admin = (HBaseAdmin) connection.getAdmin(); HTableDescriptor inboxTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_INBOX)); HColumnDescriptor infoColumnDescriptor = new HColumnDescriptor(Bytes.toBytes("info")); infoColumnDescriptor.setBlockCacheEnabled(true);
infoColumnDescriptor.setBlocksize(2097152);
infoColumnDescriptor.setMaxVersions(1000);
infoColumnDescriptor.setMinVersions(1000); inboxTableDescriptor.addFamily(infoColumnDescriptor);
admin.createTable(inboxTableDescriptor); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null != admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); }
5.發布微博內容
a、微博內容表中添加 1 條數據
b、微博收件箱表對所有粉絲用戶添加數據代碼:Message.java
package com.z.hbase.weibo; public class Message { private String uid; private String timestamp; private String content; public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public String toString() { return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content +"]"; } }
代碼:public void publishContent(String uid, String content)
/** *發布微博 *a、微博內容表中數據+1 *b、向微博收件箱表中加入微博的 Rowkey */ public void publishContent(String uid, String content){ Connection connection = null; try { connection = ConnectionFactory.createConnection(conf); //a、微博內容表中添加 1 條數據,首先獲取微博內容表描述 Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT)); //組裝 Rowkey long timestamp = System.currentTimeMillis(); String rowKey = uid + "_" + timestamp; //添加微博內容 Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content)); contentTable.put(put); //b、向微博收件箱表中加入發布的 Rowkey //b.1、查詢用戶關系表,得到當前用戶有哪些粉絲 Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION)); //b.2、取出目標數據 Get get = new Get(Bytes.toBytes(uid)); get.addFamily(Bytes.toBytes("fans")); Result result = relationTable.get(get); List<byte[]> fans = new ArrayList<byte[]>(); //遍歷取出當前發布微博的用戶的所有粉絲數據for(Cell cell : result.rawCells()){ fans.add(CellUtil.cloneQualifier(cell)); } //如果該用戶沒有粉絲,則直接 return if(fans.size() <= 0) return; //開始操作收件箱表 Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX)); //每一個粉絲,都要向收件箱中添加該微博的內容,所以每一個粉絲都是一個 Put 對象 List<Put> puts = new ArrayList<Put>(); for(byte[] fan : fans){ Put fansPut = new Put(fan); fansPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey)); puts.add(fansPut); } inboxTable.put(puts); } catch (IOException e) { e.printStackTrace(); }finally{ if(null != connection){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } }
6.添加關注用戶
a、在微博用戶關系表中,對當前主動操作的用戶添加新關注的好友
b、在微博用戶關系表中,對被關注的用戶添加新的粉絲
c、微博收件箱表中添加所關注的用戶發布的微博
代碼實現:public void addAttends(String uid, String... attends)
/** *關注用戶邏輯 *a、在微博用戶關系表中,對當前主動操作的用戶添加新的關注的好友 *b、在微博用戶關系表中,對被關注的用戶添加粉絲(當前操作的用戶) *c、當前操作用戶的微博收件箱添加所關注的用戶發布的微博 rowkey */ public void addAttends(String uid, String... attends){ //參數過濾 if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){ return; } Connection connection = null; try { connection = ConnectionFactory.createConnection(conf); //用戶關系表操作對象(連接到用戶關系表) Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION)); List<Put> puts = new ArrayList<Put>(); //a、在微博用戶關系表中,添加新關注的好友 Put attendPut = new Put(Bytes.toBytes(uid)); for(String attend : attends){ //為當前用戶添加關注的人 attendPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend)); //b、為被關注的人,添加粉絲 Put fansPut = new Put(Bytes.toBytes(attend)); fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid)); //將所有關注的人一個一個的添加到 puts(List)集合中 puts.add(fansPut); } puts.add(attendPut); relationTable.put(puts); //c.1、微博收件箱添加關注的用戶發布的微博內容(content)的 rowkey Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT)); Scan scan = new Scan(); //用於存放取出來的關注的人所發布的微博的 rowkey List<byte[]> rowkeys = new ArrayList<byte[]>(); for(String attend : attends){ //過濾掃描 rowkey,即:前置位匹配被關注的人的 uid_ RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_")); //為掃描對象指定過濾規則 scan.setFilter(filter); //通過掃描對象得到 scanner ResultScanner result = contentTable.getScanner(scan); //迭代器遍歷掃描出來的結果集 Iterator<Result> iterator = result.iterator(); while(iterator.hasNext()){ //取出每一個符合掃描結果的那一行數據 Result r = iterator.next(); for(Cell cell : r.rawCells()){ //將得到的 rowkey 放置於集合容器中 rowkeys.add(CellUtil.cloneRow(cell)); } } } //c.2、將取出的微博 rowkey 放置於當前操作的用戶的收件箱中 if(rowkeys.size() <= 0) return; //得到微博收件箱表的操作對象 Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX)); /用於存放多個關注的用戶的發布的多條微博 rowkey 信息 List<Put> inboxPutList = new ArrayList<Put>(); for(byte[] rk : rowkeys){ Put put = new Put(Bytes.toBytes(uid)); //uid_timestamp String rowKey = Bytes.toString(rk); //截取 uid String attendUID = rowKey.substring(0, rowKey.indexOf("_")); long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1)); //將微博 rowkey 添加到指定單元格中 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk); inboxPutList.add(put); } inboxTable.put(inboxPutList); } catch (IOException e) { e.printStackTrace(); }finally{ if(null != connection){ try { connection.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
7.移除(取關)用戶
a、在微博用戶關系表中,對當前主動操作的用戶移除取關的好友(attends)
b、在微博用戶關系表中,對被取關的用戶移除粉絲
c、微博收件箱中刪除取關的用戶發布的微博
代碼:public void removeAttends(String uid, String... attends)
/** *取消關注(remove) *a、在微博用戶關系表中,對當前主動操作的用戶刪除對應取關的好友 *b、在微博用戶關系表中,對被取消關注的人刪除粉絲(當前操作人) *c、從收件箱中,刪除取關的人的微博的 rowkey * */ public void removeAttends(String uid, String... attends){ //過濾數據 if(uid == null || uid.length() <= 0 || attends == null || attends.length <= 0) return; try { Connection connection = ConnectionFactory.createConnection(conf); //a、在微博用戶關系表中,刪除已關注的好友 Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION)); //待刪除的用戶關系表中的所有數據 List<Delete> deleteList = new ArrayList<Delete>(); //當前取關操作者的 uid 對應的 Delete 對象 Delete attendDelete = new Delete(Bytes.toBytes(uid)); //遍歷取關,同時每次取關都要將被取關的人的粉絲-1 for(String attend : attends){ attendDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend)); //b、在微博用戶關系表中,對被取消關注的人刪除粉絲(當前操作人) Delete fansDelete = new Delete(Bytes.toBytes(attend)); fansDelete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid)); deleteList.add(fansDelete); } deleteList.add(attendDelete); relationTable.delete(deleteList); //c、刪除取關的人的微博 rowkey 從 收件箱表中 Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX)); Delete inboxDelete = new Delete(Bytes.toBytes(uid)); for(String attend : attends){ inboxDelete.addColumn(Bytes.toBytes("info"), Bytes.toBytes(attend)); } inboxTable.delete(inboxDelete) } catch (IOException e) { e.printStackTrace(); } }
8.獲取關注的人的微博內容
a、從微博收件箱中獲取所關注的用戶的微博 RowKey
b、根據獲取的 RowKey,得到微博內容
代碼實現:public List<Message> getAttendsContent(String uid)
/** * 獲取微博實際內容 *a、從微博收件箱中獲取所有關注的人的發布的微博的 rowkey *b、根據得到的 rowkey 去微博內容表中得到數據 *c、將得到的數據封裝到 Message 對象中 */ public List<Message> getAttendsContent(String uid){ Connection connection = null; try { connection = ConnectionFactory.createConnection(conf) Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX)); //a、從收件箱中取得微博 rowKey Get get = new Get(Bytes.toBytes(uid)); //設置最大版本號 get.setMaxVersions(5); List<byte[]> rowkeys = new ArrayList<byte[]>(); Result result = inboxTable.get(get); for(Cell cell : result.rawCells()){ rowkeys.add(CellUtil.cloneValue(cell)); } //b、根據取出的所有 rowkey 去微博內容表中檢索數據 Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT)); List<Get> gets = new ArrayList<Get>(); //根據 rowkey 取出對應微博的具體內容 for(byte[] rk : rowkeys){ Get g = new Get(rk); gets.add(g); } //得到所有的微博內容的 result 對象 Result[] results = contentTable.get(gets); //將每一條微博內容都封裝為消息對象 List<Message> messages = new ArrayList<Message>(); for(Result res : results){ for(Cell cell : res.rawCells()){ Message message = new Message(); String rowKey = Bytes.toString(CellUtil.cloneRow(cell)); String userid = rowKey.substring(0, rowKey.indexOf("_")); String timestamp = rowKey.substring(rowKey.indexOf("_") + 1); String content = Bytes.toString(CellUtil.cloneValue(cell)); message.setContent(content); message.setTimestamp(timestamp); message.setUid(userid); messages.add(message) } } return messages; } catch (IOException e) { e.printStackTrace(); }finally{ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } return null; }
9.測試
-- 測試發布微博內容
public void testPublishContent(WeiBo wb)
-- 測試添加關注
public void testAddAttend(WeiBo wb)
-- 測試取消關注
public void testRemoveAttend(WeiBo wb)
-- 測試展示內容
public void testShowMessage(WeiBo wb)
代碼:
/** *發布微博內容 *添加關注 *取消關注 *展示內容 */ public void testPublishContent(WeiBo wb){ wb.publishContent("0001", "今天買了一包空氣,送了點薯片,非常開心!!"); wb.publishContent("0001", "今天天氣不錯。"); } public void testAddAttend(WeiBo wb){ wb.publishContent("0008", "准備下課!"); wb.publishContent("0009", "准備關機!"); wb.addAttends("0001", "0008", "0009"); } public void testRemoveAttend(WeiBo wb){ wb.removeAttends("0001", "0008"); } public void testShowMessage(WeiBo wb){ List<Message> messages = wb.getAttendsContent("0001"); for(Message message : messages){ System.out.println(message); } } public static void main(String[] args) {
WeiBo weibo = new WeiBo();
weibo.initTable(); weibo.testPublishContent(weibo);
weibo.testAddAttend(weibo);
weibo.testShowMessage(weibo);
weibo.testRemoveAttend(weibo);
weibo.testShowMessage(weibo); }