3.3.4.4 包:com.atguigu.dataanalasis.outputformat

1. 类: MysqlOutput

因为是输出到 Mysql, 所以需要自定义 OutputFormat

package com.atguigu.dataanalasis.outputformat;

import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.CountDurationValue;
import com.atguigu.dataanalasis.util.JDBCUtil;
import com.atguigu.dataanalasis.conversion.DimensionConversion;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlOutput extends OutputFormat<CommonDimension, CountDurationValue> {

    private FileOutputCommitter committer;

    @Override
    public RecordWriter<CommonDimension, CountDurationValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new MysqlRecordWriter();
    }

    @Override
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        if (committer == null) {
            Path output = FileOutputFormat.getOutputPath(context);
            committer = new FileOutputCommitter(output, context);
        }
        return committer;
    }

    /**
     * 实现 MysqlRecordWriter 类
     */
    public static class MysqlRecordWriter extends RecordWriter<CommonDimension, CountDurationValue> {
        private Connection conn;
        private PreparedStatement preparedStatement;
        private DimensionConversion conversion;
        private String sql;
        private int cacheBound;
        private int count;

        public MysqlRecordWriter() {

            try {
                conversion = new DimensionConversion();
                conn = JDBCUtil.getInstance();
                conn.setAutoCommit(false);  // 不让mysql自动提交
                sql = "INSERT INTO tb_call VALUE(?, ?, ?, ?, ?) " +
                        "ON DUPLICATE KEY " +
                        "UPDATE call_count_sum=?, call_duration_sum=?";
                preparedStatement = conn.prepareStatement(sql);
                cacheBound = 500;
                count = 0;
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        /**
         * 核心的写出方法(数据写到 Mysql)
         *
         * @param key
         * @param value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void write(CommonDimension key, CountDurationValue value) throws IOException, InterruptedException {
            try {
                // 获取联系人维度的id和时间维度的id
                int contactId = conversion.getDimensionId(key.getContactDimension());
                int dateId = conversion.getDimensionId(key.getDateDimension());

                // 获取通话次数
                int countSum = value.getCountSum();

                // 获取通话时长
                int durationSum = value.getDurationSum();

                // 拼接主表的 id_data_contact 列的值
                String idDataContact = contactId + "_" + dateId;

                preparedStatement.setString(1, idDataContact);
                preparedStatement.setInt(2, dateId);
                preparedStatement.setInt(3, contactId);
                preparedStatement.setInt(4, countSum);
                preparedStatement.setInt(5, durationSum);
                preparedStatement.setInt(6, countSum);
                preparedStatement.setInt(7, durationSum);
                preparedStatement.addBatch();
                count++;
                if (count >= cacheBound) {
                    preparedStatement.executeBatch();
                    conn.commit();
                    count = 0;
                }

            } catch (SQLException e) {
                e.printStackTrace();
            }

        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // 收尾工作, 将残留的数据提交
            try {
                //在关闭资源之前, 先执行并提交数据
                preparedStatement.executeBatch();
                conn.commit();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            JDBCUtil.close(null, preparedStatement, conn);
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-01-14 05:27:28

results matching ""

    No results matching ""