3.3.4.3 包:com.atguigu.dataanalasis.mapred
1. 类: CallDriver
驱动类
package com.atguigu.dataanalasis.mapred;
import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.outputformat.MysqlOutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CallDriver implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
// 获取job对象
Job job = Job.getInstance();
// 设置主类
job.setJarByClass(CallDriver.class);
// 设置 Mapper
TableMapReduceUtil.initTableMapperJob(
"ns_telecom:calllog",
new Scan(),
CallMapper.class,
CommonDimension.class,
Text.class,
job);
// 设置 Reducer
job.setReducerClass(CallReducer.class);
// 设置 Output
job.setOutputFormatClass(MysqlOutput.class);
job.addFileToClassPath(new Path("hdfs://hadoop201:9000/libs/mysql-connector-java-5.1.27-bin.jar"));
// 提交
return job.waitForCompletion(true) ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new CallDriver(), args);
String s = run == 0 ? "正常结束" : "发生异常";
System.out.println(s);
System.exit(run);
}
}
2. 类: CallMapper
package com.atguigu.dataanalasis.mapred;
import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.ContactDimension;
import com.atguigu.dataanalasis.bean.DateDimension;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class CallMapper extends TableMapper<CommonDimension, Text> {
private Map<String, String> contacts = new HashMap<>();
private Text v = new Text();
private CommonDimension commonDimension = new CommonDimension();
private ContactDimension contactDimension = new ContactDimension();
private DateDimension dateDimension = new DateDimension();
/**
* 任务开始的时候调用一次
*
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
contacts.put("15369468720", "李雁");
contacts.put("19920860202", "卫艺");
contacts.put("18411925860", "仰莉");
contacts.put("14473548449", "陶欣悦");
contacts.put("18749966182", "施梅梅");
contacts.put("19379884788", "金虹霖");
contacts.put("19335715448", "魏明艳");
contacts.put("18503558939", "华贞");
contacts.put("13407209608", "华啟倩");
contacts.put("15596505995", "仲采绿");
contacts.put("17519874292", "卫丹");
contacts.put("15178485516", "戚丽红");
contacts.put("19877232369", "何翠柔");
contacts.put("18706287692", "钱溶艳");
contacts.put("18944239644", "钱琳");
contacts.put("17325302007", "缪静欣");
contacts.put("18839074540", "焦秋菊");
contacts.put("19879419704", "吕访琴");
contacts.put("16480981069", "沈丹");
contacts.put("18674257265", "褚美丽");
contacts.put("18302820904", "孙怡");
contacts.put("15133295266", "许婵");
contacts.put("17868457605", "曹红恋");
contacts.put("15490732767", "吕柔");
contacts.put("15064972307", "冯怜云");
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String rowKey = Bytes.toString(key.get());
String[] split = rowKey.split("_");
// 0005_19877232369_2019-09-12 04:19:10_13407209608_2988_1
String call1 = split[1];
String startTime = split[2];
String duration = split[4];
// 2019-09-12 04:19:10
String year = startTime.substring(0, 4);
String month = startTime.substring(5, 7);
String day = startTime.substring(8, 10);
// 封装 value
v.set(duration);
// 封装 key :
commonDimension.setContactDimension(contactDimension);
commonDimension.setDateDimension(dateDimension);
contactDimension.setTelephone(call1);
contactDimension.setName(contacts.get(call1));
/*
每从HBase 读取到一行记录, 将来要向Mysql的 tb_contacts 写入 3 条记录
*/
// 封装日维度
dateDimension.setYear(year);
dateDimension.setMonth(month);
dateDimension.setDay(day);
// 写出去r维度
context.write(commonDimension, v);
// 封装月维度 把日期改为-1
dateDimension.setDay("-1");
context.write(commonDimension, v);
// 封装年维度 把月和日期都改为-1
dateDimension.setMonth("-1");
context.write(commonDimension, v);
}
}
3. 类:CallReducer
package com.atguigu.dataanalasis.mapred;
import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.CountDurationValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CallReducer extends Reducer<CommonDimension, Text, CommonDimension, CountDurationValue> {
private CountDurationValue countDurationValue = new CountDurationValue();
@Override
protected void reduce(CommonDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int countSum = 0;
int durationSum = 0;
for (Text value : values) {
durationSum += Integer.parseInt(value.toString());
countSum++;
}
// 把两个值封装到对象中
countDurationValue.setCountSum(countSum);
countDurationValue.setDurationSum(durationSum);
// 写出数据, 然后让output来处理数据应该如何写到 Mysql 中
context.write(key, countDurationValue);
}
}