在spark中,目前使用比较强大的就是spark structured streaming,这是在日常工作中经常见到的场景。这篇文章我们介绍下使用流的方式读Delta Lake数据和写Delta Lake数据。
一、从kafka等流中读取数据,处理完成后把数据写入Delta lake中
这里写数据比较简单,就是把以前的sink案例修改一下即可,整个写分为两种方式,一种是追加的方式,一种是每一个批次覆盖一整张表,下面挨个给示例:
1.1、追加的方式
events.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json") .start("/delta/events") // as a path
1.2、每一个批次覆盖的方式
events.writeStream .format("delta") .load("/delta/events") .groupBy("customerId") .count() .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg") .start("/delta/eventsByCustomer")
二、从delta Lake中流式读取数据
这里就是将Delta Lake表用作流式处理源,只能将追加操作包含在流中。除非指定ignoreChanges或者ignoreDeletes选项,否则数据修改将导致错误。代码示例如下:
val stream_df = session.readStream.format("delta") .option("ignoreChanges", "true") .load("/delta/internetorders") stream_df.show()
备注:
1、当把Delta Lake当作流式数据源的时候,其实可以看做是一个cdc。
还没有评论,来说两句吧...