在前面我们演示了spark streaming的介绍,同时我们也提到过在spark2.x的部分里面提供了全新的结构化流处理也就是这里的spark structured streaming。这两个都是spark的流处理,但是从形象上来说是不同的东西,两者的差别是:
1、spark streaming里面使用的是rdd的数据处理方式,spark structured streaming里面使用的是datastream/dataset的数据处理方式 2、spark streaming使用的是微批聚合再进行批处理的方式,spark structured streaming使用的是类table表的方式来处理,相当于把所有的数据组成了一张大表,然后来一次数据就追加插入数据到这张表里面。 3、spark streaming的处理入口是spark context,spark structured streaming的处理入口是spark sql sessison.
整个spark structured streaming的流程图如下:
下面演示一下spark structured streaming的案例,还是一样的,通过kafka消息队列,然后在程序里面执行单词计数。
一、创建一个maven项目,并且在pom里面引入依赖信息
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency>
二、使用java语言编写一个kafka的生产者
package org.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class KafkaProduer { private static final Logger log = LoggerFactory.getLogger(KafkaProduer.class); private static final String TOPIC = "sp_test"; private static final String BROKER_LIST = "192.168.31.10:9092"; /** * 消息发送确认 0,只要消息提交到消息缓冲,就视为消息发送成功 1,只要消息发送到分区Leader且写入磁盘,就视为消息发送成功 * all,消息发送到分区Leader且写入磁盘,同时其他副本分区也同步到磁盘,才视为消息发送成功 */ private static final String ACKS_CONFIG = "1"; /** * 缓存消息数达到此数值后批量提交 */ private static final String BATCH_SIZE_CONFIG = "1"; private static KafkaProducer<String, String> producer; static { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.setProperty(ProducerConfig.ACKS_CONFIG, ACKS_CONFIG); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE_CONFIG); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<String, String>(properties); } public void send() { try { // 等待启动日志打印完后再发送消息 String message = "this is a test"; ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, message); producer.send(record, (recordMetadata, e) -> { if (e != null) { log.info("发送消息异常!"); System.out.println("发送失败:"+e.getMessage()); } if (recordMetadata != null) { // topic 下可以有多个分区,每个分区的消费者维护一个 offset System.out.println("发送成功,分区是:"+recordMetadata.partition()+", 位置是: "+recordMetadata.offset()); log.info("消息发送成功:{} - {}", recordMetadata.partition(), recordMetadata.offset()); } }); } catch (Exception e) { e.printStackTrace(); } finally { if (null != producer) { producer.close(); } } } public static void main(String[] args) { KafkaProduer producer = new KafkaProduer(); producer.send(); } }
三、测试运行下kafka produer的代码
kafka生产者没有问题,这里其实使用的还是之前streaming演示里面的kafak生产者。
四、使用scala编写spark structured streaming的应用程序,实现单词计数
package org.example import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.{DataFrame, SparkSession} object Demo { def main(args: Array[String]): Unit = { //在windows环境下需要设置hadoop的环境变量,不然运行程序的时候会报错 System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master"); // 构建SparkSession实例对象 val session = SparkSession.builder().appName("demo").master("local[1]") // 设置Shuffle分区数目 .config("spark.sql.shuffle.partitions", "3").getOrCreate() // 导入隐式转换和函数库 import session.implicits._ val kafkaStreamDF: DataFrame = session.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.31.10:9092") .option("subscribe", "sp_test") .option("startingOffsets", "earliest") //设置每批次消费数据最大值 .option("maxOffsetsPerTrigger", "100000") .load() //进行词频统计 val resultStreamDF: DataFrame = kafkaStreamDF // 获取value字段的值,转换为String类型 .selectExpr("CAST(value AS STRING)") // 转换为Dataset类型 .as[String] // 过滤数据 .filter(line => null != line && line.trim.length > 0) // 分割单词 .flatMap(line => line.trim.split("\\s+")) // 按照单词分组,聚合 .groupBy($"value").count() // 设置Streaming应用输出及启动 val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .format("console") .option("numRows", "10") .option("truncate", "false") .option("checkpointLocation","C:\\Users\\Administrator\\Desktop\\fsdownload\\checkpoint") .start() query.awaitTermination() // 查询器等待流式应用终止 query.stop() // 等待所有任务运行完成才停止运行 kafkaStreamDF.checkpoint() resultStreamDF.checkpoint() } }
五、运行下效果看看
备注:
1、所有使用到流计算的地方都需要设置checkpoint,在spark structured streaming里面设置checkpoint的方式和之前是不一样的,这里设置checkpoint需要在writeStream里面的option进行设置。
.option("checkpointLocation","C:\\Users\\Administrator\\Desktop\\fsdownload\\checkpoint")
同时在checkpoint里面存储的信息也和之前不一样,现在存储的信息如下:
还没有评论,来说两句吧...