先简单的贴贴代码,后面会完善一下。

一,spark sql 读取hive表

这里通过catalog查询表的字段信息,然后 查询出来的字段colStr 要给下面的doris使用。

 注意:我这里是直接拿取的hive表所有的字段。

二,spark自定义输出

这里就是简单封装了一下

实现的效果:

三,通过stream load方式数据写入doris

   循环遍历DataFrame之后写入到doris里面:
val dorisStreamLoader = new DorisStreamLoad("192.168.5.xx:8040",
"example_db", "assuer_order_test", "root", "root") val cumsArrays =
colStr.split(",") val fieldDelimiter: String = "\t" val lineDelimiter: String =
"\n" val NULL_VALUE: String = "\\N" val maxRowCount = 5000 val maxRetryTimes =
3 data.rdd.foreachPartition(partition => { val buffer = ListBuffer[String]()
var jsonArrays = new JSONArray() partition.foreach(f = row => { // val value:
StringJoiner = new StringJoiner(fieldDelimiter) // create one row string val
json = new JSONObject() for (i <- 0 until row.size) { val field = row.get(i)
val fieldName = cumsArrays(i) if (field == null) { // value.add(NULL_VALUE)
json.put(fieldName, NULL_VALUE) } else { // value.add(field.toString)
json.put(fieldName, field.toString) } } jsonArrays.add(json) // add one row
string to buffer // buffer += value.toString // if (buffer.size >= maxRowCount)
{ // flush // Thread.sleep(1000L) // } if (jsonArrays.size() >= maxRowCount) {
flush Thread.sleep(1000L) } }) // flush buffer if (jsonArrays.size() > 0) {
flush Thread.sleep(1000L) } def flush = { val loop = new Breaks loop.breakable
{ for (i <- 1 to maxRetryTimes) { try { //
dorisStreamLoader.load(buffer.mkString(lineDelimiter))
dorisStreamLoader.load(jsonArrays.toJSONString) // buffer.clear()
jsonArrays.clear() loop.break() } catch { case e: Exception => try {
Thread.sleep(1000 * i) //
dorisStreamLoader.load(buffer.mkString(lineDelimiter)) //
dorisStreamLoader.load(jsonArrays.toJSONString)
dorisStreamLoader.load(jsonArrays.toJSONString) //buffer.clear()
jsonArrays.clear() } catch { case ex: InterruptedException =>
Thread.currentThread.interrupt() throw new IOException("unable to flush;
interrupted while doing another attempt", e) } } } } } })
注意: 我在这里写入doris是直接写入doris
BE节点,后面代码需要传入3个FE,然后随机找一个FE,然后再获取BE写入,这样子做到负载均衡,而且写入失败可以重试。
 

四,测试

 就直接上代码了,自己验证了吧:
package com.sjb.spark2doris.test import java.io.IOException import
com.alibaba.fastjson.{JSONArray, JSONObject} import
com.sjb.spark2doris.{DorisStreamLoad, SparkDataFrame2Doris} import
org.apache.commons.lang3.StringUtils import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf import org.apache.spark.sql.catalog.Column
import org.apache.spark.sql.{Dataset, SparkSession} import
scala.collection.mutable.ListBuffer import scala.util.control.Breaks //todo
com.sjb.spark2doris.test.SparkDataFrame2DorisTest object
SparkDataFrame2DorisTest { var LOGGER: Logger =
Logger.getLogger(SparkDataFrame2Doris.getClass)
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF) def
main(args: Array[String]): Unit = { val sparkConf = new
SparkConf().setAppName(this.getClass.getName) if (args.length < 1) {
println("本地模式.......................") sparkConf.setMaster("local[*]") //
sys.exit(1) } else { println("生产模式............") } //
System.setProperty("HADOOP_USER_NAME", "hive")
sparkConf.set("javax.jdo.option.ConnectionURL",
"jdbc:mysql://192.168.x.x:3306/hive?createDatabaseIfNotExist=true&characterEncoding=UTF-8")
sparkConf.set("javax.jdo.option.ConnectionDriverName", "com.mysql.jdbc.Driver")
sparkConf.set("javax.jdo.option.ConnectionUserName", "hive")
sparkConf.set("javax.jdo.option.ConnectionPassword", "Hive") val spark =
SparkSession .builder .config(sparkConf) .enableHiveSupport()
.config("spark.sql.warehouse.dir", "spark-warehouse")
.config("dfs.client.use.datanode.hostname", "true") .getOrCreate() try {
//设置hive数据库 spark.sql("set hive.exec.dynamic.partition=true") spark.sql("set
hive.exec.dynamic.partition.mode=nonstrict") // spark.sql("use doris_test") //
spark.sql("show tables").show(100) //todo 打印输出表字段
spark.catalog.listColumns("doris_test", "fs_plt_assure_orders_test").show() val
aa: Dataset[Column] = spark.catalog.listColumns("doris_test",
"fs_plt_assure_orders_test") val colList = aa.collect().map(x => x.name) val
colStr = colList.mkString(",") val sql = "select " + colStr + " from
doris_test.fs_plt_assure_orders_test" println(sql) val data =
spark.sql(sql).toDF() val dorisStreamLoader = new
DorisStreamLoad("192.168.5.xx:8040", "example_db", "assuer_order_test", "root",
"root") val cumsArrays = colStr.split(",") val fieldDelimiter: String = "\t"
val lineDelimiter: String = "\n" val NULL_VALUE: String = "\\N" val maxRowCount
= 5000 val maxRetryTimes = 3 data.rdd.foreachPartition(partition => { val
buffer = ListBuffer[String]() var jsonArrays = new JSONArray()
partition.foreach(f = row => { // val value: StringJoiner = new
StringJoiner(fieldDelimiter) // create one row string val json = new
JSONObject() for (i <- 0 until row.size) { val field = row.get(i) val fieldName
= cumsArrays(i) if (field == null) { // value.add(NULL_VALUE)
json.put(fieldName, NULL_VALUE) } else { // value.add(field.toString)
json.put(fieldName, field.toString) } } jsonArrays.add(json) // add one row
string to buffer // buffer += value.toString // if (buffer.size >= maxRowCount)
{ // flush // Thread.sleep(1000L) // } if (jsonArrays.size() >= maxRowCount) {
flush Thread.sleep(1000L) } }) // flush buffer if (jsonArrays.size() > 0) {
flush Thread.sleep(1000L) } def flush = { val loop = new Breaks loop.breakable
{ for (i <- 1 to maxRetryTimes) { try { //
dorisStreamLoader.load(buffer.mkString(lineDelimiter))
dorisStreamLoader.load(jsonArrays.toJSONString) // buffer.clear()
jsonArrays.clear() loop.break() } catch { case e: Exception => try {
Thread.sleep(1000 * i) //
dorisStreamLoader.load(buffer.mkString(lineDelimiter)) //
dorisStreamLoader.load(jsonArrays.toJSONString)
dorisStreamLoader.load(jsonArrays.toJSONString) //buffer.clear()
jsonArrays.clear() } catch { case ex: InterruptedException =>
Thread.currentThread.interrupt() throw new IOException("unable to flush;
interrupted while doing another attempt", e) } } } } } }) LOGGER.info("data
write success.....") } catch { case e: Exception => { e.printStackTrace() } }
spark.stop() } }

DorisStreamLoad这个类也很简单,看我之前的文章或者对doris stream load方式写入了解都知道很简单,如果你不懂的话 还是先学习一下。

五,总结

       
这种方式比起我之前需要通过flink读取hdfs写入kafka,再由kafka写入到doris,就省事很多,测试 上百个字段的hive表数据 千万级别的 本地idea运行也没有OOM、

        最后的话,有需要的不懂的小伙伴可以私信问我 我很乐意分享,我也是参考官方的代码。

技术
下载桌面版
GitHub
Microsoft Store
SourceForge
Gitee
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
京东云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信