这个系列的话我们主要是记录一些实战的代码,方便大家在使用的时候实现直接照抄即可。本文主要介绍的是使用Spark实现读取doris数据并且写入hive的案例,直接上代码:
//初始化spark env
val spark = SparkSession.builder()
.appName("Doris Reader")
.config("spark.sql.warehouse.dir","hdfs://xxxx/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xxxx:9083")
.config("hive.exec.dynamic.partition",true)
.config("hive.exec.dynamic.partition.mode","nonstrict")
.config(conf)
.enableHiveSupport()
.getOrCreate()
val syncDate:String=args(0)
// 读取Doris分区数据
val df = spark
.read
.format("doris")
.option("doris.table.identifier", "xxx.xxx")
.option("doris.fenodes", "xxx:xxx")
.option("user", "....")
.option("password", "****")
.option("doris.filter.query",s"pDate = '$syncDate'")
.load()
// df数据写入hive(方式一)
df.write.format("hive").mode(SaveMode.Overwrite).partitionBy("pdate").saveAsTable("mdz_doris.dwd_vehicleDriverStatusInfoTest")
// df写入hive(方式二)
df.drop("pDate").createOrReplaceTempView("result")
spark.sql(s"insert overwrite mdz_doris.dwd_vehicledriverstatusinfotest PARTITION(pdate='$syncDate') select * from result ")这里的话,记得在maven中引入如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency>









还没有评论,来说两句吧...