11.3 准备工作

11.3.1 ETL 数据清洗

是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、交互转换(transform)、加载(load)至目的端的过程

元数据的特点:

  1. 通过观察原始数据形式,可以发现,字段之间是用 /t来分割的.

  2. 每个视频的所属分类可以有多个, 且多个分类之间用 & 开分割, 并且 & 两边有空格

  3. 每个视频的相关视频也是用 \t 来进行分割的

  4. 每条数据可以没有相关视频.

  5. 因为一共有 10 列数据, 可以没有相关视频的数据, 所以如果一条数据如果小于 9 列, 就算脏数据.

清洗操作

  1. 去除脏数据: 小于 9 列的数据

  2. 为了便于后期数据分析, 需要把视频类别中的空格去掉, 用 & 来分割

  3. 多个相关视频之间也应该用&分割.

数据清洗用到的 MapReduce 程序

步骤1:添加依赖

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.2</version>
</dependency>

步骤2: Mapper类

package com.atguigu;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class VideoETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private Text resultKey = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 清洗数据
        String etlString = ETLUtil.elt(value.toString());
        if(StringUtils.isBlank(etlString)) return;
        // 2. 写出数据
        resultKey.set(etlString);
        context.write(resultKey, NullWritable.get());
    }
}

步骤3: ETLUtil类

package com.atguigu;

public class ETLUtil {
    public static String elt(String s) {
        // 1. 用 \t 切割字符串. 把所有的字段切割出来了.
        String[] splits = s.split("\t");
        if (splits.length < 9) return null;  // 如果长度小于 9 表示是脏数据, 之间返回空字符串
        // 2. 去掉视频类别中的 & 两边的空格
        splits[3] = splits[3].replaceAll(" ", "");
        // 3. 把数组中所有的字符串拼接起来. 注意拼接的时候需要把从下标为 9 的字符串开始用 & 来拼接
        StringBuilder sb = new StringBuilder();
        String joinStr = "";  // 用于拼接的时候的连字符
        for (int i = 0; i < splits.length; i++) {
            joinStr = i < 9 ? "\t" : "&";
            joinStr = i == splits.length - 1 ? "" : joinStr;
            sb.append(splits[i] + joinStr);  // 最后一个地方不需要连字符
        }
        // 4. 返回拼接好的字符串  .... ..... ... ..&&.. 111 222 aaa&bbb&ccc&
        return sb.toString();
    }
}

步骤4: Runner类

package com.atguigu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import javax.xml.soap.Text;
import java.io.FileOutputStream;

public class VideoETLRunner implements Tool {

    private Configuration conf;

    public int run(String[] strings) throws Exception {
        System.out.println(getConf());
        // 1. 获得 Job 对象
        Job job = Job.getInstance(getConf());
        // 2. 设置主类
        job.setJarByClass(VideoETLRunner.class);
        // 3. 设置 Mapper 类
        job.setMapperClass(VideoETLMapper.class);
        // 4. 不需要reduce
        job.setNumReduceTasks(0);

        // 5. 设置 Map key-value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        // 6. 设置输入路径
        FileInputFormat.setInputPaths(job, new Path("/gulivideo/video/2008/0222"));
        // 7 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path("/output"));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) throws Exception {
        int resultCode = ToolRunner.run(new VideoETLRunner(), args);
        if (resultCode == 0) {
            System.out.println("success!");
        } else {
            System.out.println("failed!");
        }
    }
}

步骤5: 编译成 jar 包, 发布到集群执行

# 上传原始数据到 HDFS
hdfs fs -put 'guiliVideo' /   
yarn jar gulivideo-1.0-SNAPSHOT.jar com.atguigu.VideoETLRunner

11.3.2 建表

建 4 张表:

  • 2 张存储原始格式的表(ETC后的数据)

  • 2 章存储orc格式的表(数据从原始表导入)

建: gulivideo_ori

create table gulivideo_ori(
    videoId string, 
    uploader string, 
    age int, 
    category array<string>, 
    length int, 
    views int, 
    rate float, 
    ratings int, 
    comments int,
    relatedId array<string>)
row format delimited 
fields terminated by "\t"
collection items terminated by "&"
stored as textfile;

导入数据:

load data inpath '/output' into table gulivideo_ori;

建: gulivideo_user_ori

create table gulivideo_user_ori(
    uploader string,
    videos int,
    friends int
)
row format delimited 
fields terminated by "\t" 
stored as textfile;

导入数据:

load data inpath '/gulivideo/user/2008/0903' into table gulivideo_user_ori;

建: gulivideo_orc

create table gulivideo_orc 
stored as orc
as select * from gulivideo_ori;

建: gulivideo_user_orc

create table gulivideo_user_orc 
stored as orc
as select * from gulivideo_user_ori;

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-03-16 07:15:56

results matching ""

    No results matching ""