11.2 写入数据到 Phoenix
11.2.1 在 Phoenix 创建表
create table gmall_order_info (
id varchar primary key,
province_id varchar,
consignee varchar,
order_comment varchar,
consignee_tel varchar,
order_status varchar,
payment_way varchar,
user_id varchar,
img_url varchar,
total_amount decimal,
expire_time varchar,
delivery_address varchar,
create_time varchar,
operate_time varchar,
tracking_no varchar,
parent_order_id varchar,
out_trade_no varchar,
trade_body varchar,
create_date varchar,
create_hour varchar)
11.2.2 写数据到 Phoenix
OrderInfo.scala
case class OrderInfo(id: String,
province_id: String,
var consignee: String,
order_comment: String,
var consignee_tel: String,
order_status: String,
payment_way: String,
user_id: String,
img_url: String,
total_amount: Double,
expire_time: String,
delivery_address: String,
create_time: String,
operate_time: String,
tracking_no: String,
parent_order_id: String,
out_trade_no: String,
trade_body: String,
var create_date: String,
var create_hour: String)
OrderApp
import java.text.SimpleDateFormat
import com.alibaba.fastjson.JSON
import com.atguigu.gmall0225.common.util.GmallConstant
import com.atguigu.gmall0225.realtime.bean.OrderInfo
import com.atguigu.gmall0225.realtime.util.MyKafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object OrderApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("OrderApp")
val ssc = new StreamingContext(conf, Seconds(2))
val sourceDStream: InputDStream[(String, String)] = MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_ORDER)
val orerInfoDStream: DStream[OrderInfo] = sourceDStream.map {
case (_, value) => {
val orderInfo = JSON.parseObject(value, classOf[OrderInfo])
orderInfo.consignee = orderInfo.consignee.substring(0, 1) + "**"
orderInfo.consignee_tel = orderInfo.consignee_tel.substring(0, 3) +
"****" + orderInfo.consignee_tel.substring(7, 11)
val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(orderInfo.create_time)
orderInfo.create_date = new SimpleDateFormat("yyyy-MM-dd").format(date)
orderInfo.create_hour = new SimpleDateFormat("HH").format(date)
orderInfo
}
}
import org.apache.phoenix.spark._
orerInfoDStream.foreachRDD(rdd => {
rdd.saveToPhoenix(
"GMALL_ORDER_INFO",
Seq("ID", "PROVINCE_ID", "CONSIGNEE", "ORDER_COMMENT", "CONSIGNEE_TEL", "ORDER_STATUS", "PAYMENT_WAY", "USER_ID", "IMG_URL", "TOTAL_AMOUNT", "EXPIRE_TIME", "DELIVERY_ADDRESS", "CREATE_TIME", "OPERATE_TIME", "TRACKING_NO", "PARENT_ORDER_ID", "OUT_TRADE_NO", "TRADE_BODY", "CREATE_DATE", "CREATE_HOUR"),
zkUrl = Some("hadoop201,hadoop202,hadoop203:2181"))
})
orerInfoDStream.print
ssc.start()
ssc.awaitTermination()
}
}