在spark中,进行流处理的方式无外乎就是数据输入源,数据转换,数据写出这3个大的步骤,根据前面的文章知识点,这里的数据输入源在前面我们演示过了kafka,数据转换其实就是各种dataframe的操作,只有最后一个数据写出没有介绍到,在这里我们分别介绍下spark structured streaming的数据写出操作,即sink操作。下面分别介绍下写入到文件,写入到kafka,写入到mysql的操作。
一、数据写入到文件
val query = rateSourceDF.writeStream .outputMode("append") .format("json") // or "csv" .option("path", "tmp/output") // 设置输出目录 .option("checkpointLocation", "tmp/ck") // 设置 checkpoint .start()
写入到文件,我们需要执行的操作有:
1、设置写出的输出模式,一般都是append为主。
2、设置format格式为json或者csv等。
3、设置输出文件的目录路径。
二、数据写入到kafka
val query = ratesSinkForKafkaDF.writeStream .outputMode("append") .format("kafka") .option("kafka.bootstrap.servers", "192.168.31.10:9092") .option("topic","rates") .option("checkpointLocation", "tmp/rates") .start()
写入到kafka,我们需要执行的操作有:
1、设置写出的输出模式,一般都是append为主。
2、设置format格式为kafka。spark支持直接写入kafka,只需要使用对应的包即可。
3、设置kafka的链接地址。
4、设置kafka写入的topic。
三、数据写入到mysql
val writer = new JDBCSink(url,user, pwd) val query: StreamingQuery = resultStreamDF .writeStream .foreach(writer) .outputMode("update") .trigger(ProcessingTime("25 seconds")) .start()
package org.example import org.apache.spark.sql.ForeachWriter import java.sql._ class JDBCSink(url: String, user: String, pwd: String) extends ForeachWriter[(String, String)] { val driver = "com.mysql.jdbc.Driver" var connection: Connection = _ var statement: Statement = _ def open(partitionId: Long, version: Long): Boolean = { Class.forName(driver) connection = DriverManager.getConnection(url, user, pwd) statement = connection.createStatement true } def process(value: (String, String)): Unit = { statement.executeUpdate("INSERT INTO ${table_name}" + "VALUES (" + value._1 + "," + value._2 + ")") } def close(errorOrNull: Throwable): Unit = { connection.close } }
写入到mysql的话,目前没有直接可用的sink,需要我们自定义mysql的sink,代码示例同上。
四、输出到控制台显示
val query = rateSourceDF.writeStream .outputMode("append") .format("console") // console data sink .option("truncate",value = false) // 不截断显示 .option("numRows",30) // 每次输出 30 行 .start()
备注:
1、以上4种sink是我们常见的的sink,在实际的情况下,还可能会存在多种sink,此时我们就需要自定义sink了。关于自定义sink,会在后面的文章中介绍到。
还没有评论,来说两句吧...