上文《大数据实战系列(一)Spark 实现读取doris数据写入hive》我们实现了使用spark读取doris数据然后写入hive,这里的话我们来实现读取hive数据,写入doris。直接上代码:
val spark = SparkSession.builder() .appName("Spark Hive to Doris") .config("spark.sql.warehouse.dir","hdfs://xxxxx/user/hive/warehouse") .config("hive.metastore.uris", "thrift://xxxxx:9083") .config(conf) .enableHiveSupport() .getOrCreate() val syncDate=args(0) // 读取Hive分区表数据 val hivePartitionTableDF = spark.sql(s"SELECT * FROM mdz_doris.dwd_vehicledriverstatusinfotest WHERE pdate ='$syncDate'") // 将数据写入Doris hivePartitionTableDF .write .format("doris") .option("doris.table.identifier", "xxxx.xxxx") .option("doris.fenodes", "xxxx:xxx") .option("user", "xxxx") .option("password", "****") //其它选项 //指定你要写入的字段 .option("doris.write.fields", "column1,column2,column3,....") .save() spark.stop()
最后记得在maven中添加如下的依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency>
还没有评论,来说两句吧...