<>RDD、DataFrame、DataSet 三者的关系

➢ Spark1.0 => RDD
➢ Spark1.3 => DataFrame
➢ Spark1.6 => Dataset

如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark版本DataSet
有可能会逐步取代RDD和DataFrame 成为唯一的API 接口。

<>三者的共性

➢ RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集,为处理超大型数据提供便利;
➢ 三者都有惰性机制,在进行创建、转换,如map 方法时,不会立即执行,只有在遇到Action 如 foreach 时,三者才会开始遍历运算;
➢ 三者有许多共同的函数,如 filter,排序等;
➢ 在对DataFrame 和Dataset 进行操作许多操作都需要这个包:import spark.implicits._(在创建好
SparkSession 对象后尽量直接导入)
➢ 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
➢ 三者都有 partition 的概念
➢ DataFrame 和DataSet 均可使用模式匹配获取各个字段的值和类型

<>三者的区别

RDD
➢ RDD一般和 spark mllib 同时使用
➢ RDD不支持 sparksql 操作

DataFrame
➢ 与RDD和Dataset 不同,DataFrame 每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
➢ DataFrame 与DataSet 一般不与 spark mllib 同时使用
➢ DataFrame 与DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之类,还能注册临时表/视窗,进行 sql
语句操作
➢ DataFrame 与DataSet 支持一些特别方便的保存方式,比如保存成 csv,可以带上表头,这样每一列的字段名一目了然

DataSet
➢ Dataset 和DataFrame 拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame 其实就是DataSet 的一个特例
typeDataFrame = Dataset[Row]
➢ DataFrame
也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的 getAS
方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset 中,每一行是什么类型是不一定的,在自定义了case class
之后可以很自由的获得每一行的信息

<>RDD、DataFrame、DataSet 三者的转换

<>DataFrame与DataSet转换
package sparkSQL.study import org.apache.spark.SparkConf import org.apache.
spark.sql.{Dataset, SparkSession} object DF_DS { def main(args: Array[String]):
Unit = { val conf = new SparkConf().setAppName("DataFrame_DateSet...").setMaster
("local[*]") val sparkSession = SparkSession.builder().config(conf).getOrCreate(
) import sparkSession.implicits._ // TODO DataFrame <==> DataSet val rdd =
sparkSession.sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20))) val
dataFrame= rdd.toDF("name", "age") dataFrame.show() val ds = dataFrame.as[
UserDF_DS] ds.show() val dataFrame1 = ds.toDF() dataFrame1.show() sparkSession.
stop() } case class UserDF_DS( name:String, age:Int) }
<>DataFrame与RDD转换
package sparkSQL.study import org.apache.spark.SparkConf import org.apache.
spark.sql.SparkSession object DF_RDD { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrame_RDD...").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(conf).getOrCreate() import
sparkSession.implicits._ // TODO DataFrame <==> RDD val rdd = sparkSession.
sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20))) val dataFrame = rdd.
toDF("name", "age") dataFrame.show() val rdd1 = dataFrame.rdd rdd1.collect().
foreach(println) sparkSession.stop() } }
<>RDD与DataSet转换
package sparkSQL.study import org.apache.spark.SparkConf import org.apache.
spark.sql.SparkSession object RDD_DS { def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataSet_RDD...").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(conf).getOrCreate() import
sparkSession.implicits._ // TODO RDD <==> DataSet val rdd = sparkSession.
sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20))) val ds = rdd.map {
case (name, age) => { UserRDD_DS(name, age) } }.toDS() ds.show() val rdd1 = ds.
rdd rdd1.collect().foreach(println) sparkSession.stop() } case class UserRDD_DS(
name:String, age:Int) }
所需依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</
artifactId> <version>3.0.1</version> </dependency>

技术
今日推荐
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信