6.3 创建操作 ES 的工具类

由于多个子模块都会操作 ES, 所以, 我们把这个工具类定义在gmall-common包下.

步骤 1: 在gmall-common模块中添加对 ES 的依赖

<!--es 相关依赖开始-->
<dependency>
    <groupId>io.searchbox</groupId>
    <artifactId>jest</artifactId>
    <version>6.3.1</version>
</dependency>

<dependency>
    <groupId>net.java.dev.jna</groupId>
    <artifactId>jna</artifactId>
    <version>4.5.2</version>
</dependency>

<dependency>
    <groupId>org.codehaus.janino</groupId>
    <artifactId>commons-compiler</artifactId>
    <version>2.7.8</version>
</dependency>
<!-- es 相关依赖结束 -->

步骤 2: MyESUtil工具类源码

package com.atguigu.dw.gmall.common.util

import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, DocumentResult, Index}

import scala.collection.mutable

/**
  * Author lzc
  * Date 2019/5/15 6:50 PM
  */
object MyESUtil {
    private val ES_HOST = "http://hadoop201"
    private val ES_HTTP_PORT = 9200
    private var factory: JestClientFactory = _

    /**
      * 构建客户端工厂对象
      */
    def buildFactory(): Unit = {
        val config: HttpClientConfig = new HttpClientConfig.Builder(s"$ES_HOST:$ES_HTTP_PORT")
            .multiThreaded(true)
            .maxTotalConnection(20)
            .connTimeout(10000)
            .readTimeout(10000)
            .build()
        factory = new JestClientFactory()
        factory.setHttpClientConfig(config)
    }

    /**
      * 获取客户端对象
      *
      * @return
      */
    def getClient(): JestClient = {
        if (factory == null) buildFactory()
        factory.getObject
    }

    /**
      * 关闭客户端
      *
      * @param client
      */
    def closeClient(client: JestClient) = {
        if (client != null) {
            try {
                client.shutdownClient()
            } catch {
                case e => e.printStackTrace()
            }
        }
    }

    /**
      * 批量插入数据
      * 插入的时候保证至少传一个 source 进来
      *
      * @param index
      * @param doc
      * @param source
      * @param otherSource
      */
    def insertBulk(index: String, sources: Iterable[Any]) = {
        val bulkBuilder = new Bulk.Builder().defaultIndex(index).defaultType("_doc")
        sources.foreach(any => {
            bulkBuilder.addAction(new Index.Builder(any).build())
        })
        val client: JestClient = getClient()
        client.execute(bulkBuilder.build())
        closeClient(client)
    }
    def main(args: Array[String]): Unit = {
        /*val client: JestClient = getClient()
        //        singleOperation(client)
        multiOperation(client)
        closeClient(client)*/


        val source1 = Startup("mid777111", "uid222", "", "", "", "", "", "", "", "", "", 123124141)
        val source2 = Startup("mid888222", "uid333", "", "", "", "", "", "", "", "", "", 123124141)
        insertBulk("gmall_dau", source1::source2::Nil)
    }

    /**
      * 批量操作测试
      *
      * @param client
      * @return
      */
    def multiOperation(client: JestClient) = {
        val source1 = Startup("mid777", "uid222", "", "", "", "", "", "", "", "", "", 123124141)
        val source2 = Startup("mid888", "uid333", "", "", "", "", "", "", "", "", "", 123124141)
        val bulkBuilder = new Bulk.Builder().defaultIndex("gmall_dau").defaultType("_doc")
        bulkBuilder.addAction(new Index.Builder(source1).build())
        bulkBuilder.addAction(new Index.Builder(source2).build())
        client.execute(bulkBuilder.build())
    }

    /**
      * 单次操作测试
      *
      * @param client
      * @return
      */
    private def singleOperation(client: JestClient): DocumentResult = {
        // 测试
        // 1. 写入操作(单个)   index? type?  内容?
        // 1.1 插入json 格式的字符串
        val source1 =
        """
          |{
          |  "mid": "mid_1234",
          |  "uid": "123"
          |}
        """.stripMargin
        // 1.2 插入样例类
        val source2 = Startup("mid777", "uid222", "", "", "", "", "", "", "", "", "", 123124141)
        val index: Index = new Index.Builder(source2)
            .index("gmall_dau")
            .`type`("_doc")
            .build()
        client.execute(index)
    }
}

case class Startup(mid: String,
                   uid: String,
                   appid: String,
                   area: String,
                   os: String,
                   ch: String,
                   logType: String,
                   vs: String,
                   var logDate: String,
                   var logHour: String,
                   var logHourMinute: String,
                   var ts: Long
                  ) {

}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-10-08 23:56:19

results matching ""

    No results matching ""