多语言展示
当前在线:1693今日阅读:165今日分享:48

Spark Worker工作流程图、启动Driver源码解密

Spark Worker原理和源码剖析解密:Worker工作流程图、启动Driver源码解密
方法/步骤
1

Worker中Driver和Executor注册过程Worker本身核心的作用是:管理当前机器的内存和CPU等资源,接受Master的指令来启动Driver,或者启动Executor。如何启动Driver如何启动Executor如果Driver或者Executor有挂掉了,则Master就可以通过schedule再次调度资源。Worker本身在实际运行的时候作为一个进程。实现RPC通信的。extends ThreadSafeRpcEndpoint with Logging {Master通过RPC协议将消息发给Worker,Worker通过receive接收到了Master发过来的消息。case LaunchDriver(driverId, driverDesc) => {  logInfo(s'Asked to launch driver $driverId')  val driver = new DriverRunner(    conf,    driverId,    workDir, //工作目录    sparkHome,    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),    self,    workerUri,    securityMgr)  drivers(driverId) = driver//启动DriverRunner  driver.start()  coresUsed += driverDesc.cores  memoryUsed += driverDesc.mem}根据DriverId来具体管理DriverRunner。DriverRunner内部通过开辟线程的方式来启动了另外的一个线程。DriverRunner是Driver所在进程中Driver本身的Process。DriverId DriverRunnerval drivers = new HashMap[String, DriverRunner]

2

DriverRunner:管理Driver的执行,包括在Driver失败的时候自动重启,主要是指在standaolone模式。Worker会负责重新启动Driver。Cluster中的Driver失败的时候,如果supervise为true,则启动Driver的Worker会负责重新启动该Driver。/** * Manages the execution of one driver, including automatically restarting the driver on failure. * This is currently only used in standalone cluster deploy mode. */

3

创建Driver的工作目录:/** Starts a thread to run and manage the driver. */private[worker] def start() = {  new Thread('DriverRunner for ' + driverId) {    override def run() {      try {        val driverDir = createWorkingDirectory()

4

创建Driver的工作目录/**  * Creates the working directory for this driver.  * Will throw an exception if there are errors preparing the directory.  */private def createWorkingDirectory(): File = {//创建Driver的工作目录  val driverDir = new File(workDir, driverId)  if (!driverDir.exists() && !driverDir.mkdirs()) {    throw new IOException('Failed to create directory ' + driverDir)  }  driverDir}

5

代码打成Jar包val localJarFilename = downloadUserJar(driverDir)

6

下载Jar文件,返回Jar在本地的路径,将程序打成JAR包上传到HDFS上,这样每台机器均可以从HDFS上下载。/*** Download the user jar into the supplied directory and return its local path.* Will throw an exception if there are errors downloading the jar.*/private def downloadUserJar(driverDir: File): String = {//  val jarPath = new Path(driverDesc.jarUrl)//从HDFS上获取Jar文件。  val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)  val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)  val jarFileName = jarPath.getName  val localJarFile = new File(driverDir, jarFileName)  val localJarFilename = localJarFile.getAbsolutePath  if (!localJarFile.exists()) { // May already exist if running multiple workers on one node    logInfo(s'Copying user jar jarPathtodestPath')    Utils.fetchFile(      driverDesc.jarUrl,      driverDir,      conf,      securityManager,      hadoopConf,      System.currentTimeMillis(),      useCache = false)  }  if (!localJarFile.exists()) { // Verify copy succeeded    throw new Exception(s'Did not see expected jar jarFileNameindriverDir')  }  localJarFilename}

7

有些变量在开始的时候是占位符,因为还没有初始化,所以在实际运行的时候要初始化。def substituteVariables(argument: String): String = argument match {  case '{{WORKER_URL}}' => workerUrl          case '{{USER_JAR}}' => localJarFilename //前面已经下载好了。  case other => other}

8

command主要就是构建进程执行类的入口// TODO: If we add ability to submit multiple jars they should also be added here// driverDesc.command指定启动的时候运行什么类。  val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,    driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)//launchDriver  launchDriver(builder, driverDir, driverDesc.supervise)}

9

launchDriver的源码如下:将stdout和stderr重定向到了baseDir之下了,这样就可以通过log去查看之前的执行情况private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {  builder.directory(baseDir)  def initialize(process: Process): Unit = {    // Redirect stdout and stderr to files    val stdout = new File(baseDir, 'stdout')    CommandUtils.redirectStream(process.getInputStream, stdout)    val stderr = new File(baseDir, 'stderr')        //将command格式化一下    val formattedCommand = builder.command.asScala.mkString('\'', '\' \'', '\'')    val header = 'Launch Command: %s\n%s\n\n'.format(formattedCommand, '=' * 40)    Files.append(header, stderr, UTF_8)    CommandUtils.redirectStream(process.getErrorStream, stderr)  }  runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)}

10

ProcessBuilderLike静态方法:private[deploy] object ProcessBuilderLike {//apply方法复写了start方法  def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {    override def start(): Process = processBuilder.start()    override def command: Seq[String] = processBuilder.command().asScala  }}

11

ProcessBuilderLike源码如下:// Needed because ProcessBuilder is a final class and cannot be mockedprivate[deploy] trait ProcessBuilderLike {  def start(): Process  def command: Seq[String]}

12

而在runCommandWithRetry方法中://传入ProcessBuilderLike的接口def runCommandWithRetry(    command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {  // Time to wait between submission retries.  var waitSeconds = 1  // A run of this many seconds resets the exponential back-off.  val successfulRunDuration = 5  var keepTrying = !killed  while (keepTrying) {    logInfo('Launch Command: ' + command.command.mkString('\'', '\' \'', '\''))    synchronized {      if (killed) { return }        //调用ProcessBuilderLike的start()方法      process = Some(command.start())      initialize(process.get)    }    val processStart = clock.getTimeMillis()//然后再调用process.get.waitFor()来完成启动Driver。    val exitCode = process.get.waitFor()    if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {      waitSeconds = 1    }  if (supervise && exitCode != 0 && !killed) {    logInfo(s'Command exited with status exitCode,re−launchingafterwaitSeconds s.')    sleeper.sleep(waitSeconds)    waitSeconds = waitSeconds * 2 // exponential back-off  }  keepTrying = supervise && exitCode != 0 && !killed  finalExitCode = Some(exitCode)}

13

如果Driver的状态有变,则会给自己发条消息worker.send(DriverStateChanged(driverId, state, finalException))

14

Worker端case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {//处理Driver State Changed  handleDriverStateChanged(driverStateChanged)}

15

给Master发消息private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {  val driverId = driverStateChanged.driverId  val exception = driverStateChanged.exception  val state = driverStateChanged.state  state match {    case DriverState.ERROR =>      logWarning(s'Driver driverIdfailedwithunrecoverableexception:{exception.get}')    case DriverState.FAILED =>      logWarning(s'Driver $driverId exited with failure')    case DriverState.FINISHED =>      logInfo(s'Driver $driverId exited successfully')    case DriverState.KILLED =>      logInfo(s'Driver $driverId was killed by user')    case _ =>      logDebug(s'Driver driverIdchangedstatetostate')  }//给master发送消息,告诉master,Driver状态发生变化了。  sendToMaster(driverStateChanged)

16

Master端receive方法是负责接收Worker发消息的。根据Driver状态进行处理。case DriverStateChanged(driverId, state, exception) => {  state match {    case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>      removeDriver(driverId, state, exception)    case _ =>      throw new Exception(s'Received unexpected state update for driver driverId:state')  }}

17

removeDriver方法:从自己的数据结构中remove掉private def removeDriver(    driverId: String,    finalState: DriverState,    exception: Option[Exception]) {  drivers.find(d => d.id == driverId) match {    case Some(driver) =>      logInfo(s'Removing driver: $driverId')      drivers -= driver      if (completedDrivers.size >= RETAINED_DRIVERS) {        val toRemove = math.max(RETAINED_DRIVERS / 10, 1)        completedDrivers.trimStart(toRemove)      }      completedDrivers += driver        //删除持久化引擎,例如Zookeeper持久化数据。      persistenceEngine.removeDriver(driver)      driver.state = finalState      driver.exception = exception      driver.worker.foreach(w => w.removeDriver(driver))        //资源发生了变动,执行下schedule      schedule()    case None =>      logWarning(s'Asked to remove unknown driver: $driverId')  }}

18

LaunchExecutor:先判断是否此时的路径是是activeMasterUrlcase LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>  if (masterUrl != activeMasterUrl) {    logWarning('Invalid Master (' + masterUrl + ') attempted to launch executor.')

19

创建Executor的工作目录// Create the executor's working directoryval executorDir = new File(workDir, appId + '/' + execId)

20

启动ExecutorRunnerval manager = new ExecutorRunner(  appId,  execId,  appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),  cores_,  memory_,  self,  workerId,  host,  webUi.boundPort,  publicAddress,  sparkHome,  executorDir,  workerUri,  conf,  appLocalDirs, ExecutorState.RUNNING)executors(appId + '/' + execId) = managermanager.start()

21

Start()方法通过fetchAndRunExecutor方法启动Executorprivate[worker] def start() {  workerThread = new Thread('ExecutorRunner for ' + fullId) {    override def run() { fetchAndRunExecutor() }  }

22

fetchAndRunExecutor源码如下:/*** Download and run the executor described in our ApplicationDescription*/private def fetchAndRunExecutor() {  try {    // Launch the process    val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),      memory, sparkHome.getAbsolutePath, substituteVariables)    val command = builder.command()    val formattedCommand = command.asScala.mkString('\'', '\' \'', '\'')    logInfo(s'Launch command: $formattedCommand')    builder.directory(executorDir)    builder.environment.put('SPARK_EXECUTOR_DIRS', appLocalDirs.mkString(File.pathSeparator))    // In case we are running this from within the Spark Shell, avoid creating a 'scala'    // parent process for the executor command    builder.environment.put('SPARK_LAUNCH_WITH_SCALA', '0')    // Add webUI log urls    val baseUrl =      s'http://publicAddress:webUiPort/logPage/?appId=appId&executorId=appId&executorId=execId&logType='    builder.environment.put('SPARK_LOG_URL_STDERR', s'${baseUrl}stderr')    builder.environment.put('SPARK_LOG_URL_STDOUT', s'${baseUrl}stdout')    process = builder.start()    val header = 'Spark Executor Command: %s\n%s\n\n'.format(      formattedCommand, '=' * 40)    // Redirect its stdout and stderr to files    val stdout = new File(executorDir, 'stdout')    stdoutAppender = FileAppender(process.getInputStream, stdout, conf)    val stderr = new File(executorDir, 'stderr')    Files.write(header, stderr, UTF_8)    stderrAppender = FileAppender(process.getErrorStream, stderr, conf)    // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)    // or with nonzero exit code    val exitCode = process.waitFor()    state = ExecutorState.EXITED    val message = 'Command exited with code ' + exitCode//executor状态改变的时候给Worker发消息。    worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))

23

Worker将消息发送给MastersendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

24

Master端处理的时候,还要给Driver发送消息case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {  val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))  execOption match {    case Some(exec) => {      val appInfo = idToApp(appId)      val oldState = exec.state      exec.state = state      if (state == ExecutorState.RUNNING) {        assert(oldState == ExecutorState.LAUNCHING,          s'executor execIdstatetransferfromoldState to RUNNING is illegal')        appInfo.resetRetryCount()      }//给Driver发送消息告诉Driver,Executor状态发生改变了。      exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))

推荐信息