这个系列的话我们主要是记录一些实战的代码,方便大家在使用的时候实现直接照抄即可。本文主要介绍的是使用Spark实现读取doris数据并且写入hive的案例,直接上代码:
//初始化spark env val spark = SparkSession.builder() .appName("Doris Reader") .config("spark.sql.warehouse.dir","hdfs://xxxx/user/hive/warehouse") .config("hive.metastore.uris", "thrift://xxxx:9083") .config("hive.exec.dynamic.partition",true) .config("hive.exec.dynamic.partition.mode","nonstrict") .config(conf) .enableHiveSupport() .getOrCreate() val syncDate:String=args(0) // 读取Doris分区数据 val df = spark .read .format("doris") .option("doris.table.identifier", "xxx.xxx") .option("doris.fenodes", "xxx:xxx") .option("user", "....") .option("password", "****") .option("doris.filter.query",s"pDate = '$syncDate'") .load() // df数据写入hive(方式一) df.write.format("hive").mode(SaveMode.Overwrite).partitionBy("pdate").saveAsTable("mdz_doris.dwd_vehicleDriverStatusInfoTest") // df写入hive(方式二) df.drop("pDate").createOrReplaceTempView("result") spark.sql(s"insert overwrite mdz_doris.dwd_vehicledriverstatusinfotest PARTITION(pdate='$syncDate') select * from result ")
这里的话,记得在maven中引入如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency>
还没有评论,来说两句吧...