将RDD,DataFrame,DataSet之间进行互相转换

RDD -》 DataFrame

*
直接手动转换
scala> val people =
spark.read.json("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.json")
people: org.apache.spark.sql.DataFrame = [age: bigint, name: string] scala> val
people1 =
sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
people1: org.apache.spark.rdd.RDD[String] =
/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt
MapPartitionsRDD[18] at textFile at <console>:24 scala> val peopleSplit =
people1.map{x => val strs = x.split(",");(strs(0),strs(1).trim.toInt)}
peopleSplit: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[19] at
map at <console>:26 scala> peopleSplit.collect res6: Array[(String, Int)] =
Array((Michael,29), (Andy,30), (Justin,19)) scala> peopleSplit.to toDF toDS
toDebugString toJavaRDD toLocalIterator toString top scala> peopleSplit.toDF
res7: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala>
peopleSplit.toDF("name","age") res8: org.apache.spark.sql.DataFrame = [name:
string, age: int] scala> res8.show +-------+---+ | name|age| +-------+---+
|Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
* 通过Scala编程实现 ## 创建 schema scala> val schema =
StructType(StructField("name",StringType)::StructField("age",IntegerType)::Nil)
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(name,StringType,true),
StructField(age,IntegerType,true)) ## 加载RDD数据 scala> val rdd =
sc.textFile("/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt")
rdd: org.apache.spark.rdd.RDD[String] =
/opt/apps/Spark/spark-2.2.2-bin-hadoop2.7/examples/src/main/resources/people.txt
MapPartitionsRDD[1] at textFile at <console>:30 ## 创建Row对象 scala> val data =
rdd.map{x => val strs = x.split(",");Row(strs(0),strs(1).trim.toInt)} data:
org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[2] at map
at <console>:32 ## 生成DF scala> spark.createDataFrame(data,schema) 18/09/06
09:45:00 WARN ObjectStore: Version information not found in metastore.
hive.metastore.schema.verification is not enabled so recording the schema
version 1.2.0 18/09/06 09:45:00 WARN ObjectStore: Failed to get database
default, returning NoSuchObjectException 18/09/06 09:45:02 WARN ObjectStore:
Failed to get database global_temp, returning NoSuchObjectException res0:
org.apache.spark.sql.DataFrame = [name: string, age: int]
* 反射 scala> case class People(name:String,age:Int) defined class People
scala> rdd.map{x => val
strs=x.split(",");People(strs(0),strs(1).trim.toInt)}.toDF res2:
org.apache.spark.sql.DataFrame = [name: string, age: int]
DataFrame -》 RDD
scala> res8.rdd res10: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] =
MapPartitionsRDD[26] at rdd at <console>:31
RDD -》 DataSet
scala> peopleSplit.toDS res11: org.apache.spark.sql.Dataset[(String, Int)] =
[_1: string, _2: int] scala> case class People(name:String,age:Int) defined
class People scala> val peopleDSSplit = people1.map{x => val strs =
x.split(","); People(strs(0),strs(1).trim.toInt)} peopleDSSplit:
org.apache.spark.rdd.RDD[People] = MapPartitionsRDD[27] at map at <console>:28
scala> peopleDSSplit.toDS res12: org.apache.spark.sql.Dataset[People] = [name:
string, age: int] scala> res12.show +-------+---+ | name|age| +-------+---+
|Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
DataSet -》 RDD
scala> res12.rdd res14: org.apache.spark.rdd.RDD[People] =
MapPartitionsRDD[32] at rdd at <console>:33 scala> res14.map(_.name).collect
res15: Array[String] = Array(Michael, Andy, Justin)
DataSet -》 DataFrame
scala> res12.toDF res16: org.apache.spark.sql.DataFrame = [name: string, age:
int]
DataFrame -》 Datset
scala> res16.as[People] res17: org.apache.spark.sql.Dataset[People] = [name:
string, age: int]
 

 

 

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信