Weibo类
这个类中放了各种具体的业务逻辑操作
package com.atguigu.weibo;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Weibo {
public static void main(String[] args) throws IOException {
// 程序的入口, 做各种测试用
createNamespace();
createContentTable();
createRelationTable();
createInboxTable();
// addAttend("1001", "1002");
// addAttend("1001", "1003");
// addAttend("1002", "1003");
// addData("1002", "1002->1");
// addData("1002", "1002 ---> 2");
// addData("1002", "1002 ---> 3");
// addData("1002", "1002 ---> 4");
// addData("1002", "1002 ---> 5");
// addData("1003", "1003 ---> 1");
showMsg("1001");
// addData("1003", "1003 ---> 3");
removeAttends("1001", "1003");
showMsg("1001");
}
/**
* 创建命名空间
*
* @throws IOException
*/
public static void createNamespace() throws IOException {
WeiboUtil.createNameSpace(WeiboConstant.NAMESPACE);
}
/**
* 创建内容表
*
* @throws IOException
*/
public static void createContentTable() throws IOException {
WeiboUtil.createTable(WeiboConstant.TBL_CONTENT, 1, "info");
}
/**
* 创建用户关系表
*
* @throws IOException
*/
public static void createRelationTable() throws IOException {
WeiboUtil.createTable(WeiboConstant.TBL_RELATION, 1, "attends", "fans");
}
/**
* 创建收件箱表
*
* @throws IOException
*/
public static void createInboxTable() throws IOException {
WeiboUtil.createTable(WeiboConstant.TBL_INBOX, 3, "info");
}
/**
* 发布微博
* 需要做两件事:
* 1. 把数据存入到指定用户内容表中
* 2. 把这条微博推送到收件箱表中
*
* @param uid 该条微博的uid
* @param content 该条微博的文本内容
*/
public static void addData(String uid, String content) throws IOException {
Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);
// 根据用户id, 为每一条微博生成一个唯一的 rowKey
String rowKey = uid + "_" + System.currentTimeMillis();
// 1. 把数据存入到内容表中
Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
contentTable.put(put);
// 2. 把这条微博推送到收件箱表中
//2.1 在用户关系表中找到操作者的所有粉丝
List<byte[]> fansUids = new ArrayList<>(); // 存储这个用户的所有的粉丝的 uid
Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
Get get = new Get(Bytes.toBytes(uid));
get.addFamily(Bytes.toBytes("fans"));
Result result = relationTable.get(get);
List<Cell> cells = result.listCells();
if (cells != null) {
for (Cell cell : cells) {
fansUids.add(CellUtil.cloneQualifier(cell));
}
}
//2.2 推送到收件箱表中 行就是粉丝的id
Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
for (byte[] fansUid : fansUids) {
Put put1 = new Put(fansUid); // 收件箱中的rowkey就是粉丝的id
// 列族: info, 列: 用户id, 值: 每条微博的rowkew
put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
inboxTable.put(put1);
}
// 关闭资源
contentTable.close();
relationTable.close();
inboxTable.close();
conn.close();
}
/**
* 关注用户 功能
* <p>
* 1. 在用户关系表中 uid(关注人) 这一行 attends 族中添加一列: 列名就是attendedUid(被关注人), 列值可以不需要
* 2. 在用户关系表中 attendedUid(被关注人) 这一行的 fans 族中添加一列: 列名是 uid(关注人,其实就是粉丝), 列值可以不需要
* 3. 在收件箱表中 uid(关注人, 是别人的粉丝) 这一行, info 族添加一列: 列名是 attendedUid, 列值就是 attendedUid 的最新的三条微博.
*
* @param uid 需要关注用户的用户id
* @param attendedUid 被关注的用户的id
*/
public static void addAttend(String uid, String attendedUid) throws IOException {
Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);
/*1. 在用户关系表中 uid(关注人) 这一行 attends 族中添加一列: 列名就是attendedUid(被关注人), 列值可以不需要*/
Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
Put put1 = new Put(Bytes.toBytes(uid));
put1.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attendedUid), Bytes.toBytes(""));
relationTable.put(put1);
/*2. 在用户关系表中 attendedUid(被关注人) 这一行的 fans 族中添加一列: 列名是 uid(关注人,其实就是粉丝), 列值可以不需要*/
Put put2 = new Put(Bytes.toBytes(attendedUid));
put2.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(""));
relationTable.put(put2);
/*3. 在收件箱表中 uid(关注人, 是别人的粉丝) 这一行, info 族添加一列: 列名是 attendedUid, 列值就是 attendedUid的最新的三条微博的rowkey.*/
Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
Scan scan = new Scan(Bytes.toBytes(attendedUid), Bytes.toBytes(attendedUid + "_a"));// 扫描出attendedUid_......的所有微博
ResultScanner results = contentTable.getScanner(scan);
for (Result result : results) { // 即使有多条微博, 最终也会只保留 3 条
byte[] row = result.getRow();
Put put3 = new Put(Bytes.toBytes(attendedUid));
put3.addColumn(
Bytes.toBytes("info"),
Bytes.toBytes(attendedUid),
Long.parseLong(Bytes.toString(row).split("_")[1]), // 时间戳保持和添加微博时候的时间戳一致
row);
inboxTable.put(put3);
}
}
/**
* 显示指定用户的初始化信息:
* 1. 自己的最新的 3 条微博
* <p>
* 2. 每个关注的人的最新 3 条微博
*
* @param uid
*/
public static void showMsg(String uid) throws IOException {
Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);
Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
/*1. 显示操作者的条微博*/
Scan scan1 = new Scan(Bytes.toBytes(uid), Bytes.toBytes(uid + "_a"));
ResultScanner results = contentTable.getScanner(scan1);
System.out.println("操作者" + uid + "的所有微博");
for (Result result : results) {
List<Cell> cells = result.listCells();
System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))) + " : " + Bytes.toString(CellUtil.cloneValue(cells.get(0))));
}
/*2. 显示操作者关注的所有人的最新 3 条微博 在收件箱中查找*/
Get inboxGet = new Get(Bytes.toBytes(uid));
inboxGet.addFamily(Bytes.toBytes("info"));
inboxGet.setMaxVersions(3);
Result result = inboxTable.get(inboxGet);
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
byte[] rowKey = CellUtil.cloneRow(cell);
System.out.println(Bytes.toString(rowKey) + "关注的 : " + Bytes.toString(CellUtil.cloneQualifier(cell)) + " 的微博:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
/**
* 取消关注:
* 1. 用户关系表中 uid这行 的attends族中删除attendId这列
*
* 2. 用户关系表中 attendId 这行的 fans族中删除 uid 这列
*
* 3. 收件箱表中 uid 这行 删除 attendId 这列
*
* @param uid 操作的用户
* @param attendId 取消关注谁
*/
public static void removeAttends(String uid, String attendId) throws IOException {
Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);
Table relationTab = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
List<Delete> dels = new ArrayList<>();
/*1. 用户关系表中 uid这行 的attends族中删除attends这列*/
Delete delAttendId = new Delete(Bytes.toBytes(uid));
delAttendId.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attendId));
dels.add(delAttendId);
/*2. 用户关系表中 attendId 这行的 fans族中删除 uid 这列*/
Delete delFansId = new Delete(Bytes.toBytes(attendId));
delFansId.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
dels.add(delFansId);
// 批量删除
relationTab.delete(dels);
/*3. 收件箱表中 uid 这行 删除 attendId 这列*/
Delete delInbox = new Delete(Bytes.toBytes(uid));
// 因为有多个版本, 要删除所有的版本需要使用addColumns方法
delInbox.addColumns(Bytes.toBytes("info"), Bytes.toBytes(attendId));
inboxTable.delete(delInbox);
}
}