当前位置: 首页> 游戏> 网游 > 详解 Spark 编程之 RDD 依赖关系

详解 Spark 编程之 RDD 依赖关系

时间:2025/7/11 8:46:36来源:https://blog.csdn.net/weixin_44480009/article/details/139373988 浏览次数:0次

一、依赖与血缘关系

在这里插入图片描述

  • 依赖:两个相邻 RDD 之间的关系
  • 血缘关系:多个连续的 RDD 的依赖
  • 由于 RDD 不会保存数据,为了提高容错性,每个 RDD 都会保存自己的血缘关系,一旦某个转换过程出现错误,可以根据血缘关系重新从数据源开始读取计算
object TestRDDDependency {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Dep")val sc = new SparkContext(conf)val rdd1 = sc.textFile("data/word.txt")println(rdd1.toDebugString) // 打印血缘关系println(rdd1.dependencies) // 打印依赖关系println("----------------------")val rdd2 = rdd1.flatMap(_.split(" "))println(rdd2.toDebugString) // 打印血缘关系println(rdd2.dependencies) // 打印依赖关系println("----------------------")val rdd3 = rdd2.map((_, 1))println(rdd3.toDebugString) // 打印血缘关系println(rdd3.dependencies) // 打印依赖关系println("----------------------")val rdd4 = rdd3.reduceByKey(_ + _)println(rdd4.toDebugString) // 打印血缘关系println(rdd4.dependencies) // 打印依赖关系println("----------------------")}
}

二、宽窄依赖

  • 窄依赖:OneToOneDependency,表示每一个父 (上游) RDD 的 Partition 最多被子 (下游) RDD 的一个 Partition 使用,类比喻为独生子女

    class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
    
  • 宽依赖:ShuffleDependency,表示同一个父 (上游) RDD 的 Partition 被子 (下游) RDD 的多个 Partition 依赖或者说子 RDD 的一个 Partition 需要父 RDD 的多个 Partition 的数据,所以会引起 Shuffle 操作,类比喻为多生

    class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false
    ) extends Dependency[Product2[K, V]] 
    

三、阶段划分

  • 窄依赖由于上游和下游的 RDD 分区是一对一的,所以整个的执行过程是不受其它分区执行结果的影响,每个分区只需要一个 task 就可以完成计算任务

在这里插入图片描述

  • 宽依赖由于存在 shuffle 操作,下游的 RDD 分区的数据计算需要等待上游 RDD 相关分区的数据全部执行完成后才能开始,所以存在不同阶段的划分,上游和下游 RDD 的每个分区都需要一个 task 来完成计算任务,所有阶段的划分和执行顺序可以由有向无环图 (DAG) 的形式来表示
    在这里插入图片描述

  • 阶段划分源码:

    /**结论:1.默认会至少存在一个阶段,即 resultStage,最后执行的阶段2.当存在 shuffle 依赖时,每存在一个会增加一个阶段(shuffleMapStage)3.阶段的数量 = shuffle 依赖数量 + 1
    */
    // 行动算子触发作业执行
    rdd.collect()// collect() 深入底层
    dagScheduler.runJob()// runJob() 中会调用 submitJob(),其中会调用 handleJobSubmitted()
    // handleJobSubmitted() 中的阶段划分
    try {finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {...
    }// createResultStage() 方法
    private def createResultStage(rdd: RDD[_],func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], jobId: Int, callSite: CallSite): ResultStage = {val parents = getOrCreateParentStages(rdd, jobId) // 判断是否有上一阶段val id = nextStageId.getAndIncrement()val stage = new  ResultStage(id, rdd, func, partitions, parents, jobId,  callSite) // 至少存在一个 resultStage 阶段stageIdToStage(id) = stageupdateJobIdStageIdMaps(jobId, stage)stage
    }// getOrCreateParentStages(),判断是否有上一阶段
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖getShuffleDependencies(rdd).map { shuffleDep =>// 为 shuffle 依赖创建 ShuffleMapStage 阶段getOrCreateShuffleMapStage(shuffleDep, firstJobId)}.toList
    }// getShuffleDependencies(rdd):获取当前 rdd 的 shuffle 依赖
    private[scheduler] def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {val parents = new HashSet[ShuffleDependency[_, _, _]]val visited = new HashSet[RDD[_]]val waitingForVisit = new Stack[RDD[_]]waitingForVisit.push(rdd)while (waitingForVisit.nonEmpty) {val toVisit = waitingForVisit.pop()if (!visited(toVisit)) {visited += toVisittoVisit.dependencies.foreach {case shuffleDep: ShuffleDependency[_, _, _] =>parents += shuffleDepcase dependency =>waitingForVisit.push(dependency.rdd)}}}parents
    }
    

四、任务划分

  • RDD 任务划分中间分为:Application、Job、Stage 和 Task

    • Application:初始化一个 SparkContext 即生成一个 Application
    • Job:一个 Action 算子就会生成一个 Job
    • Stage:Stage 等于宽依赖 (ShuffleDependency) 的个数加 1
    • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
  • Application -> Job -> Stag e-> Task 之间每一层都是 1 对 n 的关系

  • 任务划分源码:

    val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage => partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary,  part,  locs,  stage.latestInfo.taskMetrics,  properties, Option(jobId),Option(sc.applicationId), sc.applicationAttemptId)}case stage: ResultStage => partitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)}}
    }//
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()//
    override def findMissingPartitions(): Seq[Int] = {mapOutputTrackerMaster.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
    }
    
关键字:详解 Spark 编程之 RDD 依赖关系

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: