7.3 使用 service 层来从 es 读取数据

为了书写 json 方便, 我们使用 scala 来完成.

定义 Service 接口

package com.atguigu.dw.gmallpublisher.service

trait PublisherService {
    /**
      * 获取指定日期的日活总数
      *
      * @param date 指定的日期: 格式 2019-05-15
      * @return 日活总数
      */
    def getDauTotal(date: String): Long

    /**
      * 获取指定日期日活的小时统计
      *
      * @param date
      * @return
      */
    def getDauHour2countMap(date: String): Map[String, Long]
}

实现 Service 接口

package com.atguigu.dw.gmallpublisher.service

import java.util

import com.atguigu.dw.gmall.common.constant.GmallConstant
import io.searchbox.client.JestClient
import io.searchbox.core.search.aggregation.TermsAggregation
import io.searchbox.core.{Search, SearchResult}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service

import scala.collection.mutable

// 加此注解 将来可以给PublisherService做注入
@Service
class PublisherServiceImp extends PublisherService {
    // 自动注入
    @Autowired
    private var jestClient: JestClient = _

    /**
      * 获取指定日期的日活总数
      *
      * @param date 指定的日期: 格式 2019-05-15
      */
    override def getDauTotal(date: String): Long = {
        // 1. 定义查询 DSL
        val queryDSL =
            s"""
               |{
               |  "query": {
               |    "bool": {
               |      "filter": {
               |        "term": {
               |          "logDate": "$date"
               |        }
               |      }
               |    }
               |  }
               |}
             """.stripMargin
        // 2. 创建 Search 对象
        val search: Search = new Search.Builder(queryDSL)
            .addIndex(GmallConstant.ES_INDEX_DAU)
            .addType("_doc").build()
        // 3. 执行查询
        val result: SearchResult = jestClient.execute(search)
        // 4. 返回总数
        result.getTotal.toLong
    }

    /**
      * 获取指定日期日活的小时统计
      *
      * @param date
      * @return
      */
    override def getDauHour2countMap(date: String): Map[String, Long] = {
        val queryDSL =
            s"""
               |{
               |  "query": {
               |    "bool": {
               |      "filter": {
               |        "term": {
               |          "logDate": "$date"
               |        }
               |      }
               |    }
               |  }
               |  , "aggs": {
               |    "groupby_hour": {
               |      "terms": {
               |        "field": "logHour",
               |        "size": 24
               |      }
               |    }
               |  }
               |}
             """.stripMargin

        val search = new Search.Builder(queryDSL)
            .addIndex(GmallConstant.ES_INDEX_DAU)
            .addType("_doc")
            .build()
        val result: SearchResult = jestClient.execute(search)
        val buckets: util.List[TermsAggregation#Entry] = result.getAggregations.getTermsAggregation("groupby_hour").getBuckets

        val hour2countMap: mutable.Map[String, Long] = mutable.Map[String, Long]()
        for (i <- 0 until buckets.size) {
            val bucket: TermsAggregation#Entry = buckets.get(i)
            hour2countMap += bucket.getKey -> bucket.getCount
        }
        hour2countMap.toMap
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-10-08 23:56:19

results matching ""

    No results matching ""