上文《大数据实战系列(一)Spark 实现读取doris数据写入hive》我们实现了使用spark读取doris数据然后写入hive,这里的话我们来实现读取hive数据,写入doris。直接上代码:
val spark = SparkSession.builder()
.appName("Spark Hive to Doris")
.config("spark.sql.warehouse.dir","hdfs://xxxxx/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xxxxx:9083")
.config(conf)
.enableHiveSupport()
.getOrCreate()
val syncDate=args(0)
// 读取Hive分区表数据
val hivePartitionTableDF = spark.sql(s"SELECT * FROM mdz_doris.dwd_vehicledriverstatusinfotest WHERE pdate ='$syncDate'")
// 将数据写入Doris
hivePartitionTableDF
.write
.format("doris")
.option("doris.table.identifier", "xxxx.xxxx")
.option("doris.fenodes", "xxxx:xxx")
.option("user", "xxxx")
.option("password", "****")
//其它选项
//指定你要写入的字段
.option("doris.write.fields", "column1,column2,column3,....")
.save()
spark.stop()最后记得在maven中添加如下的依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency>

还没有评论,来说两句吧...