2.3.2 Worker 启动源码分析

Worker源码分析

org.apache.spark.deploy.worker.Worker

Worker伴生对象

启动流程基本和 Master 一致.

private[deploy] object Worker extends Logging {
    val SYSTEM_NAME = "sparkWorker"
    val ENDPOINT_NAME = "Worker"

    def main(argStrings: Array[String]) {
        Utils.initDaemon(log)
        val conf = new SparkConf
        // 构建解析参数的实例
        val args = new WorkerArguments(argStrings, conf)
        // 启动 Rpc 环境和 Rpc 终端
        val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
            args.memory, args.masters, args.workDir, conf = conf)
        rpcEnv.awaitTermination()
    }

    def startRpcEnvAndEndpoint(
                                  host: String,
                                  port: Int,
                                  webUiPort: Int,
                                  cores: Int,
                                  memory: Int,
                                  masterUrls: Array[String],
                                  workDir: String,
                                  workerNumber: Option[Int] = None,
                                  conf: SparkConf = new SparkConf): RpcEnv = {

        // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
        val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
        val securityMgr = new SecurityManager(conf)
        // 创建 RpcEnv 实例  参数: "sparkWorker", "hadoop201", 8081, conf, securityMgr
        val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
        // 根据传入 masterUrls 得到 masterAddresses.  就是从命令行中传递过来的 Master 地址
        val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
        // 最终实例化 Worker 得到 Worker 的 RpcEndpoint
        rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
            masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
        rpcEnv
    }
}

Worker伴生类

onStart 方法

override def onStart() {
    // 第一次启动断言 Worker 未注册
    assert(!registered)
    // 创建工作目录
    createWorkDir()
    // 启动 shuffle 服务
    shuffleService.startIfEnabled()
    // Worker的 WebUI
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()

    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
    // 向 Master 注册 Worker
    registerWithMaster()
}

registerWithMaster 方法

关键代码:

// 向所有的 Master 注册
registerMasterFutures = tryRegisterAllMasters()

tryRegisterAllMasters() 方法

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
        // 从线程池中启动线程来执行 Worker 向 Master 注册
        registerMasterThreadPool.submit(new Runnable {
            override def run(): Unit = {
                try {
                    // 根据 Master 的地址得到一个 Master 的 RpcEndpointRef, 然后就可以和 Master 进行通讯了.
                    val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                    // 向 Master 注册
                    registerWithMaster(masterEndpoint)
                } catch {
                }
            }
        })
    }
}

registerWithMaster 方法

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    // 向 Master 对应的 receiveAndReply 方法发送信息
    // 信息的类型是 RegisterWorker, 包括 Worker 的一些信息: id, 主机地址, 端口号, 内存, webUi
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
        workerId, host, port, self, cores, memory, workerWebUiUrl))
        .onComplete {
            // This is a very fast action so we can use "ThreadUtils.sameThread"
            // 注册成功
            case Success(msg) =>

                Utils.tryLogNonFatalError {
                    handleRegisterResponse(msg)
                }
            // 注册失败
            case Failure(e) =>
                logError(s"Cannot register with master: ${masterEndpoint.address}", e)
                System.exit(1)
        }(ThreadUtils.sameThread)
}

MasterreceiveAndReply方法

// 处理 Worker 的注册信息
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
    if (state == RecoveryState.STANDBY) {
        // 给发送者回应消息.  对方的 receive 方法会收到这个信息
        context.reply(MasterInStandby)
    } else if (idToWorker.contains(id)) { // 如果要注册的 Worker 已经存在
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
    } else {
        // 根据传来的信息封装 WorkerInfo
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
            workerRef, workerWebUiUrl)
        if (registerWorker(worker)) {  // 注册成功
            persistenceEngine.addWorker(worker)
            // 响应信息
            context.reply(RegisteredWorker(self, masterWebUiUrl))
            schedule()
        } else {
            val workerAddress = worker.endpoint.address
            context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
                + workerAddress))
        }
    }

workerhandleRegisterResponse方法

case RegisteredWorker(masterRef, masterWebUiUrl) =>
    logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
    // 已经注册过了
    registered = true
    // 更新 Master
    changeMaster(masterRef, masterWebUiUrl)
    // 通知自己给 Master 发送心跳信息  默认 1 分钟 4 次
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
        }
    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)

Workerreceive方法:

case SendHeartbeat =>
    if (connected) {
        // 给 Master 发送心跳
        sendToMaster(Heartbeat(workerId, self))
    }

Masterreceive方法:

case Heartbeat(workerId, worker) =>
    idToWorker.get(workerId) match {
        case Some(workerInfo) =>
            // 记录该 Worker 的最新心跳
            workerInfo.lastHeartbeat = System.currentTimeMillis()
    }

到此, Worker启动完成

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-06-11 12:34:39

results matching ""

    No results matching ""