Listing leaf files and directories
分析其并行化
org.apache.spark.util.HadoopFSUtils
sc.parallelize(paths, numParallelism).mapPartitions { pathsEachPartition =>val hadoopConf = serializableConfiguration.valuepathsEachPartition.map { path =>val leafFiles = listLeafFiles(path = path,hadoopConf = hadoopConf,filter = filter,contextOpt = None, // Can't execute parallel scans on workersignoreMissingFiles = ignoreMissingFiles,ignoreLocality = ignoreLocality,isRootPath = isRootLevel,parallelismThreshold = Int.MaxValue,parallelismMax = 0)(path, leafFiles)}}.collect()
// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelismMax)
parallelismMax 最终由以下配置决定。
val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =buildConf("spark.sql.sources.parallelPartitionDiscovery.parallelism").doc("The number of parallelism to list a collection of path recursively, Set the " +"number to prevent file listing from generating too many tasks.").version("2.1.1").internal().intConf.createWithDefault(10000)