<>1、从RDD创建DataFrame

<>(1)利用元组创建
object _01_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1:
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2:
RDD[(String, Int, String)] = rdd1.map(line => { val arr = line.split(",") (arr(0
), arr(1).toInt, arr(2)) }) //3、rdd创建dataFrame //导入隐式转换 import spark.implicits._
//val dataFrame: DataFrame = rdd2.toDF() val dataFrame = rdd2.toDF("name","age",
"gender") //打印结构 dataFrame.printSchema() //展示数据集 dataFrame.show() } }
<>(2)利用样例类进行创建
object _02_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1:
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2:
RDD[User] = rdd1.map(line => { val arr = line.split(",") //封装数据到caseClass中 User(
arr(0), arr(1).toInt, arr(2)) }) //3、给rdd创建dataFrame //导入隐式转换 import spark.
implicits._ val dataFrame: DataFrame = rdd2.toDF() //打印结构 dataFrame.printSchema(
) //展示数据集 dataFrame.show() } } //case class中具有get方法 case class User(id:String,
age:Int,gender:String)
<>(3)利用spark.createDataFrame(rowRDD, schema)(常用)
object _03_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1:
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) //封装数据到Row
val rowRDD: RDD[Row] = rdd1.map(line => { val arr = line.split(",") Row(arr(0),
arr(1).toInt, arr(2)) }) //3、给rowRD关联schema val schema = StructType( List(
StructField("id", DataTypes.StringType), StructField("age", DataTypes.
IntegerType), StructField("gender", DataTypes.StringType) ) ) val dataFrame =
spark.createDataFrame(rowRDD, schema) //打印结构 dataFrame.printSchema() //展示数据集
dataFrame.show() } }
<>(4)利用scala的普通类进行创建
object _04_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1:
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2:
RDD[User_01] = rdd1.map(line => { val arr = line.split(",") new User_01(arr(0),
arr(1).toInt, arr(2)) }) //3、给rdd创建dataFrame val dataFrame = spark.
createDataFrame(rdd2,classOf[User_01]) //打印结构 dataFrame.printSchema() //展示数据集
dataFrame.show() } } //使用普通的类需要添加@BeanProperty注解 class User_01( @BeanProperty
val id:String, @BeanProperty val age:Int, @BeanProperty val gender:String)
<>(5)利用java的类进行创建
object _05_SparkSession { def main(args: Array[String]): Unit = { //1、创建spark
session val spark: SparkSession = SparkSession.builder().master("local[*]").
getOrCreate() //2、依据sc创建rdd val sc: SparkContext = spark.sparkContext val rdd1:
RDD[String] = sc.parallelize(List("tom,18,man", "jack,28,woman"), 2) val rdd2:
RDD[User_03] = rdd1.map(line => { val arr = line.split(",") new User_03(arr(0),
arr(1).toInt,arr(2)) }) //3、给rdd创建dataFrame val dataFrame = spark.
createDataFrame(rdd2,classOf[User_03]) //打印结构 dataFrame.printSchema() //展示数据集
dataFrame.show() } } public class User_03 { private String id; private int age;
private String gender; public User_03(String id, int age, String gender) { this.
id= id; this.age = age; this.gender = gender; } public String getId() { return
id; } public int getAge() { return age; } public String getGender() { return
gender; } }
<>2、从结构化文件创建DataFrame

<>(1)从JSON文件进行创建

json文件中,本身就带有schema信息(名称类型),在创建DataFrame之前,每一行json文件都需要进行读取,效率比较低;
json文件中存储数据的格式比较丰富,但是要额外保存冗余的数据,因此占用的资源比较多。
object _09_ReadJson { def main(args: Array[String]): Unit = { val spark:
SparkSession= SparkSession.builder().master("local[*]").getOrCreate()
//从json文件中读取创建dataFrame val frame = spark.read.json("data/sparksql/person.json")
frame.printSchema() frame.show() spark.stop() } }
<>(2)从csv文件进行创建

csv文件格式紧凑,占用资源小;
但是需要指定表头,如果需要推断数据类型,在创建DataFrame之前需要额外触发Action,需要读文件中全部的数据,因此效率比较低。
object _08_Read_CSV { def main(args: Array[String]): Unit = { val spark =
SparkSession.builder().master("local[*]").getOrCreate() val frame: DataFrame =
spark.read .option("header","true")//第一行当作表头 .option("inferSchema",true)
//读取所有,推断数据类型 .option("sep",",")//指定分隔符 .csv("data/sparksql/shop.txt") frame.
printSchema() spark.stop() } }
<>(3)从Parquet文件进行创建

Parquet文件中有头文件,描述着数据信息,创建DataFrame之前只需要读取头文件;
数据存储紧凑,支持压缩;
支持列式存储,查询更加高效,是更好的数据存储格式。
object _11_ReadParquet { def main(args: Array[String]): Unit = { val spark:
SparkSession= SparkSession.builder().master("local[*]").getOrCreate() val frame
= spark .read .parquet("data/sparksql/par") frame.show() spark.stop() } }

除了上面的方式,还能够从外部服务器读取数据创建DataFrame,例如:从JDBC连接数据库服务器进行创建,从Hive仓库中加载创建DataFrame,从Hbase加载数据创建DataFrame,从Elastic
Search加载数据创建DataFrame,下次继续分享。

技术
下载桌面版
GitHub
Gitee
SourceForge
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信