Weibo类

这个类中放了各种具体的业务逻辑操作

package com.atguigu.weibo;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class Weibo {

    public static void main(String[] args) throws IOException {
        // 程序的入口, 做各种测试用

        createNamespace();

        createContentTable();
        createRelationTable();
        createInboxTable();

//        addAttend("1001", "1002");
//        addAttend("1001", "1003");
//        addAttend("1002", "1003");

//        addData("1002", "1002->1");
//        addData("1002", "1002 ---> 2");
//        addData("1002", "1002 ---> 3");
//        addData("1002", "1002 ---> 4");
//        addData("1002", "1002 ---> 5");
//        addData("1003", "1003 ---> 1");
        showMsg("1001");
//        addData("1003", "1003 ---> 3");
        removeAttends("1001", "1003");
        showMsg("1001");
    }


    /**
     * 创建命名空间
     *
     * @throws IOException
     */
    public static void createNamespace() throws IOException {
        WeiboUtil.createNameSpace(WeiboConstant.NAMESPACE);
    }

    /**
     * 创建内容表
     *
     * @throws IOException
     */
    public static void createContentTable() throws IOException {
        WeiboUtil.createTable(WeiboConstant.TBL_CONTENT, 1, "info");
    }

    /**
     * 创建用户关系表
     *
     * @throws IOException
     */
    public static void createRelationTable() throws IOException {
        WeiboUtil.createTable(WeiboConstant.TBL_RELATION, 1, "attends", "fans");
    }

    /**
     * 创建收件箱表
     *
     * @throws IOException
     */
    public static void createInboxTable() throws IOException {
        WeiboUtil.createTable(WeiboConstant.TBL_INBOX, 3, "info");
    }

    /**
     * 发布微博
     * 需要做两件事:
     * 1. 把数据存入到指定用户内容表中
     * 2. 把这条微博推送到收件箱表中
     *
     * @param uid     该条微博的uid
     * @param content 该条微博的文本内容
     */
    public static void addData(String uid, String content) throws IOException {
        Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);

        // 根据用户id, 为每一条微博生成一个唯一的 rowKey
        String rowKey = uid + "_" + System.currentTimeMillis();

        // 1. 把数据存入到内容表中
        Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
        contentTable.put(put);

        // 2. 把这条微博推送到收件箱表中
        //2.1 在用户关系表中找到操作者的所有粉丝
        List<byte[]> fansUids = new ArrayList<>();  // 存储这个用户的所有的粉丝的 uid
        Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));
        Result result = relationTable.get(get);
        List<Cell> cells = result.listCells();
        if (cells != null) {
            for (Cell cell : cells) {
                fansUids.add(CellUtil.cloneQualifier(cell));
            }
        }
        //2.2 推送到收件箱表中  行就是粉丝的id
        Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
        for (byte[] fansUid : fansUids) {
            Put put1 = new Put(fansUid);  // 收件箱中的rowkey就是粉丝的id
            // 列族: info, 列: 用户id, 值: 每条微博的rowkew
            put1.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
            inboxTable.put(put1);
        }

        // 关闭资源
        contentTable.close();
        relationTable.close();
        inboxTable.close();
        conn.close();
    }

    /**
     * 关注用户 功能
     * <p>
     * 1. 在用户关系表中 uid(关注人) 这一行 attends 族中添加一列: 列名就是attendedUid(被关注人), 列值可以不需要
     * 2. 在用户关系表中 attendedUid(被关注人) 这一行的 fans 族中添加一列: 列名是 uid(关注人,其实就是粉丝), 列值可以不需要
     * 3. 在收件箱表中 uid(关注人, 是别人的粉丝) 这一行, info 族添加一列: 列名是 attendedUid, 列值就是 attendedUid 的最新的三条微博.
     *
     * @param uid         需要关注用户的用户id
     * @param attendedUid 被关注的用户的id
     */
    public static void addAttend(String uid, String attendedUid) throws IOException {
        Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);

        /*1. 在用户关系表中 uid(关注人) 这一行 attends 族中添加一列: 列名就是attendedUid(被关注人), 列值可以不需要*/
        Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
        Put put1 = new Put(Bytes.toBytes(uid));
        put1.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attendedUid), Bytes.toBytes(""));
        relationTable.put(put1);

        /*2. 在用户关系表中 attendedUid(被关注人) 这一行的 fans 族中添加一列: 列名是 uid(关注人,其实就是粉丝), 列值可以不需要*/
        Put put2 = new Put(Bytes.toBytes(attendedUid));
        put2.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(""));
        relationTable.put(put2);

        /*3. 在收件箱表中 uid(关注人, 是别人的粉丝) 这一行, info 族添加一列: 列名是 attendedUid, 列值就是 attendedUid的最新的三条微博的rowkey.*/
        Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
        Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
        Scan scan = new Scan(Bytes.toBytes(attendedUid), Bytes.toBytes(attendedUid + "_a"));// 扫描出attendedUid_......的所有微博
        ResultScanner results = contentTable.getScanner(scan);
        for (Result result : results) {  // 即使有多条微博, 最终也会只保留 3 条
            byte[] row = result.getRow();
            Put put3 = new Put(Bytes.toBytes(attendedUid));
            put3.addColumn(
                    Bytes.toBytes("info"),
                    Bytes.toBytes(attendedUid),
                    Long.parseLong(Bytes.toString(row).split("_")[1]),  // 时间戳保持和添加微博时候的时间戳一致
                    row);
            inboxTable.put(put3);
        }
    }

    /**
     * 显示指定用户的初始化信息:
     * 1. 自己的最新的 3 条微博
     * <p>
     * 2. 每个关注的人的最新 3 条微博
     *
     * @param uid
     */
    public static void showMsg(String uid) throws IOException {
        Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);

        Table contentTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_CONTENT));
        Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));
        Table relationTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));

        /*1. 显示操作者的条微博*/
        Scan scan1 = new Scan(Bytes.toBytes(uid), Bytes.toBytes(uid + "_a"));
        ResultScanner results = contentTable.getScanner(scan1);

        System.out.println("操作者" + uid + "的所有微博");
        for (Result result : results) {
            List<Cell> cells = result.listCells();
            System.out.println(Bytes.toString(CellUtil.cloneRow(cells.get(0))) + " : " + Bytes.toString(CellUtil.cloneValue(cells.get(0))));
        }

        /*2. 显示操作者关注的所有人的最新 3 条微博 在收件箱中查找*/
        Get inboxGet = new Get(Bytes.toBytes(uid));
        inboxGet.addFamily(Bytes.toBytes("info"));
        inboxGet.setMaxVersions(3);
        Result result = inboxTable.get(inboxGet);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            byte[] rowKey = CellUtil.cloneRow(cell);
            System.out.println(Bytes.toString(rowKey) + "关注的 :  " + Bytes.toString(CellUtil.cloneQualifier(cell)) + " 的微博:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }

    /**
     * 取消关注:
     * 1. 用户关系表中 uid这行 的attends族中删除attendId这列
     *
     * 2. 用户关系表中 attendId 这行的 fans族中删除 uid 这列
     *
     * 3. 收件箱表中 uid 这行 删除 attendId 这列
     *
     * @param uid 操作的用户
     * @param attendId 取消关注谁
     */
    public static void removeAttends(String uid, String attendId) throws IOException {
        Connection conn = ConnectionFactory.createConnection(WeiboUtil.conf);
        Table relationTab = conn.getTable(TableName.valueOf(WeiboConstant.TBL_RELATION));
        Table inboxTable = conn.getTable(TableName.valueOf(WeiboConstant.TBL_INBOX));

        List<Delete> dels = new ArrayList<>();
        /*1. 用户关系表中 uid这行 的attends族中删除attends这列*/
        Delete delAttendId = new Delete(Bytes.toBytes(uid));
        delAttendId.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attendId));
        dels.add(delAttendId);
        /*2. 用户关系表中 attendId 这行的 fans族中删除 uid 这列*/
        Delete delFansId = new Delete(Bytes.toBytes(attendId));
        delFansId.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
        dels.add(delFansId);
        // 批量删除

        relationTab.delete(dels);

        /*3. 收件箱表中 uid 这行 删除 attendId 这列*/
        Delete delInbox = new Delete(Bytes.toBytes(uid));
        // 因为有多个版本, 要删除所有的版本需要使用addColumns方法
        delInbox.addColumns(Bytes.toBytes("info"), Bytes.toBytes(attendId));
        inboxTable.delete(delInbox);


    }

}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-03-26 08:52:09

results matching ""

    No results matching ""