1、读取本地文件,转换成map
val path = "文件路径"
val source = Source.fromFile(path).getLines().toList.mkString("").replaceAll(" ","")val key = JSON.parseObject(source).get("key").toString
val columns = JSON.parseObject(source).get("value").toStringval map = new util.HashMap[String, String]()
map.put("RK", getValue(key))JSON.parseObject(columns.toString).keySet().asScala.foreach(elem => {val valueJson = JSON.parseObject(columns.toString).get(elem).toStringmap.put(elem, getValue(valueJson))
})def getValue(str: String): String = {val value = str.toString.replace("[","").replace("]","")JSON.parseObject(value).get("value").toString
}
2、将map转变成rdd
val schema = StructType(map.asScala.toSeq.map {case(k,v) =>StruchField(k, StringType, nullable = true)
})val row = Row.fromSeq(map.values().asScala.toSeq)val rowRDD = spark.sparkContext.parallelize(Seq(row))val df = spark.createDataFrame(rowRDD, schema)
备注:数据格式
{"key":[{"name":"RK","type":"String","value":"1234567890"}],"columns":{"column_name1":["name":"column_name1","type":"String","value":"111" ],"column_name2":["name":"column_name2","type":"String","value":"222" ],"column_name3":["name":"column_name3","type":"String","value":"333" ]}
}