1 个回答
使用如下代码:
val conf = new SparkConf()
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
val session = SparkSession.builder().appName("DeltaLakeDemo").master("local[1]").config(conf).getOrCreate();
val df = session.read.parquet("deltalakefile/data/")
df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/")
df.show()
session.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
DeltaTable.forPath("deltalakefile/data/").vacuum()
发布于:2年前 (2022-12-19) IP属地:四川省
主要是利用spark的repartition,重新生成一个新的文件,然后把老的文件给删除掉
发布于:2年前 (2022-12-19) IP属地:四川省
我来回答
您需要 登录 后回答此问题!