3.3.4.4 包:com.atguigu.dataanalasis.outputformat
1. 类: MysqlOutput
因为是输出到 Mysql, 所以需要自定义 OutputFormat
package com.atguigu.dataanalasis.outputformat;
import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.CountDurationValue;
import com.atguigu.dataanalasis.util.JDBCUtil;
import com.atguigu.dataanalasis.conversion.DimensionConversion;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MysqlOutput extends OutputFormat<CommonDimension, CountDurationValue> {
private FileOutputCommitter committer;
@Override
public RecordWriter<CommonDimension, CountDurationValue> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
return new MysqlRecordWriter();
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
if (committer == null) {
Path output = FileOutputFormat.getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
/**
* 实现 MysqlRecordWriter 类
*/
public static class MysqlRecordWriter extends RecordWriter<CommonDimension, CountDurationValue> {
private Connection conn;
private PreparedStatement preparedStatement;
private DimensionConversion conversion;
private String sql;
private int cacheBound;
private int count;
public MysqlRecordWriter() {
try {
conversion = new DimensionConversion();
conn = JDBCUtil.getInstance();
conn.setAutoCommit(false); // 不让mysql自动提交
sql = "INSERT INTO tb_call VALUE(?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY " +
"UPDATE call_count_sum=?, call_duration_sum=?";
preparedStatement = conn.prepareStatement(sql);
cacheBound = 500;
count = 0;
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* 核心的写出方法(数据写到 Mysql)
*
* @param key
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(CommonDimension key, CountDurationValue value) throws IOException, InterruptedException {
try {
// 获取联系人维度的id和时间维度的id
int contactId = conversion.getDimensionId(key.getContactDimension());
int dateId = conversion.getDimensionId(key.getDateDimension());
// 获取通话次数
int countSum = value.getCountSum();
// 获取通话时长
int durationSum = value.getDurationSum();
// 拼接主表的 id_data_contact 列的值
String idDataContact = contactId + "_" + dateId;
preparedStatement.setString(1, idDataContact);
preparedStatement.setInt(2, dateId);
preparedStatement.setInt(3, contactId);
preparedStatement.setInt(4, countSum);
preparedStatement.setInt(5, durationSum);
preparedStatement.setInt(6, countSum);
preparedStatement.setInt(7, durationSum);
preparedStatement.addBatch();
count++;
if (count >= cacheBound) {
preparedStatement.executeBatch();
conn.commit();
count = 0;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// 收尾工作, 将残留的数据提交
try {
//在关闭资源之前, 先执行并提交数据
preparedStatement.executeBatch();
conn.commit();
} catch (SQLException e) {
e.printStackTrace();
}
JDBCUtil.close(null, preparedStatement, conn);
}
}
}