14.2 完成从 dw 到 es 的业务代码
利用 SparkSql 从 Hive 中查询到数据, 然后写入到 ES 中.
14.2.2 在 ES 中为宽表创建索引
PUT gmall_sale_detail
{
"mappings": {
"_doc":{
"properties":{
"user_id":{
"type":"keyword"
},
"sku_id":{
"type":"keyword"
},
"user_gender":{
"type":"keyword"
},
"user_age":{
"type":"short"
},
"user_level":{
"type":"keyword"
},
"sku_price":{
"type":"double"
},
"sku_name":{
"type":"text",
"analyzer": "ik_max_word"
},
"sku_tm_id ":{
"type":"keyword"
},
"sku_category3_id":{
"type":"keyword"
},
"sku_category2_id":{
"type":"keyword"
},
"sku_category1_id":{
"type":"keyword"
},
"sku_category3_name":{
"type":"text",
"analyzer": "ik_max_word"
},
"sku_category2_name":{
"type":"text",
"analyzer": "ik_max_word"
},
"sku_category1_name":{
"type":"text",
"analyzer": "ik_max_word"
},
"spu_id":{
"type":"keyword"
},
"sku_num":{
"type":"long"
},
"order_count":{
"type":"long"
},
"order_amount":{
"type":"long"
},
"dt":{
"type":"keyword"
}
}
}
}
}
14.2.2 创建SaleDetailDayCount
创建 bean 类com.atguigu.dw.gmalldw2es.bean.SaleDetailDayCount
用来封装宽表的数据
package com.atguigu.dw.gmalldw2es.bean
case class SaleDetailDayCount(user_id: String,
sku_id: String,
user_gender: String,
user_age: Int,
user_level: String,
order_price: Double,
sku_name: String,
sku_tm_id: String,
sku_category1_id: String,
sku_category2_id: String,
sku_category3_id: String,
sku_category1_name: String,
sku_category2_name: String,
sku_category3_name: String,
spu_id: String,
sku_num: Long,
order_count: Long,
order_amount: Double,
var dt: String)
14.2.3 创建SaleDetailApp
package com.atguigu.dw.gmalldw2es.app
import com.atguigu.dw.gmall.common.util.MyESUtil
import com.atguigu.dw.gmalldw2es.bean.SaleDetailDayCount
import org.apache.spark.sql.{Dataset, SparkSession}
/**
* Author lzc
* Date 2019/5/21 6:10 PM
*/
object SaleDetailApp {
def main(args: Array[String]): Unit = {
// 获取要查询的日期
val date = if (args.length > 0) args(0) else "2019-05-20"
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("SaleDetailApp")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val sql =
s"""
|select
| user_id,
| sku_id,
| user_gender,
| cast(user_age as int) user_age,
| user_level,
| cast(order_price as double) order_price,
| sku_name,
| sku_tm_id,
| sku_category3_id,
| sku_category2_id,
| sku_category1_id,
| sku_category3_name,
| sku_category2_name,
| sku_category1_name,
| spu_id,
| sku_num,
| cast(order_count as bigint) order_count,
| cast(order_amount as double) order_amount,
| dt
|from dws_sale_detail_daycount
|where dt='$date'
""".stripMargin
spark.sql("use gmall")
val ds: Dataset[SaleDetailDayCount] = spark.sql(sql).as[SaleDetailDayCount]
ds.foreachPartition(it => {
MyESUtil.insertBulk("gmall_sale_detail", it.toList)
})
spark.stop()
}
}