在前面我们演示了spark streaming的介绍,同时我们也提到过在spark2.x的部分里面提供了全新的结构化流处理也就是这里的spark structured streaming。这两个都是spark的流处理,但是从形象上来说是不同的东西,两者的差别是:
1、spark streaming里面使用的是rdd的数据处理方式,spark structured streaming里面使用的是datastream/dataset的数据处理方式 2、spark streaming使用的是微批聚合再进行批处理的方式,spark structured streaming使用的是类table表的方式来处理,相当于把所有的数据组成了一张大表,然后来一次数据就追加插入数据到这张表里面。 3、spark streaming的处理入口是spark context,spark structured streaming的处理入口是spark sql sessison.
整个spark structured streaming的流程图如下:
下面演示一下spark structured streaming的案例,还是一样的,通过kafka消息队列,然后在程序里面执行单词计数。
一、创建一个maven项目,并且在pom里面引入依赖信息
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency>
二、使用java语言编写一个kafka的生产者
package org.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class KafkaProduer { private static final Logger log = LoggerFactory.getLogger(KafkaProduer.class); private static final String TOPIC = "sp_test"; private static final String BROKER_LIST = "192.168.31.10:9092"; /** * 消息发送确认 0,只要消息提交到消息缓冲,就视为消息发送成功 1,只要消息发送到分区Leader且写入磁盘,就视为消息发送成功 * all,消息发送到分区Leader且写入磁盘,同时其他副本分区也同步到磁盘,才视为消息发送成功 */ private static final String ACKS_CONFIG = "1"; /** * 缓存消息数达到此数值后批量提交 */ private static final String BATCH_SIZE_CONFIG = "1"; private static KafkaProducer<String, String> producer; static { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.setProperty(ProducerConfig.ACKS_CONFIG, ACKS_CONFIG); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE_CONFIG); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<String, String>(properties); } public void send() { try { // 等待启动日志打印完后再发送消息 String message = "this is a test"; ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, message); producer.send(record, (recordMetadata, e) -> { if (e != null) { log.info("发送消息异常!"); System.out.println("发送失败:"+e.getMessage()); } if (recordMetadata != null) { // topic 下可以有多个分区,每个分区的消费者维护一个 offset System.out.println("发送成功,分区是:"+recordMetadata.partition()+", 位置是: "+recordMetadata.offset()); log.info("消息发送成功:{} - {}", recordMetadata.partition(), recordMetadata.offset()); } }); } catch (Exception e) { e.printStackTrace(); } finally { if (null != producer) { producer.close(); } } } public static void main(String[] args) { KafkaProduer producer = new KafkaProduer(); producer.send(); } }
三、测试运行下kafka produer的代码
kafka生产者没有问题,这里其实使用的还是之前streaming演示里面的kafak生产者。
四、使用scala编写spark structured streaming的应用程序,实现单词计数
package org.example
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}
object Demo {
def main(args: Array[String]): Unit = {
//在windows环境下需要设置hadoop的环境变量,不然运行程序的时候会报错
System.setProperty("hadoop.home.dir", "D:\\hadoop-common-2.2.0-bin-master");
// 构建SparkSession实例对象
val session = SparkSession.builder().appName("demo").master("local[1]")
// 设置Shuffle分区数目
.config("spark.sql.shuffle.partitions", "3").getOrCreate()
// 导入隐式转换和函数库
import session.implicits._
val kafkaStreamDF: DataFrame = session.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.31.10:9092")
.option("subscribe", "sp_test")
.option("startingOffsets", "earliest")
//设置每批次消费数据最大值
.option("maxOffsetsPerTrigger", "100000")
.load()
//进行词频统计
val resultStreamDF: DataFrame = kafkaStreamDF
// 获取value字段的值,转换为String类型
.selectExpr("CAST(value AS STRING)")
// 转换为Dataset类型
.as[String]
// 过滤数据
.filter(line => null != line && line.trim.length > 0)
// 分割单词
.flatMap(line => line.trim.split("\\s+"))
// 按照单词分组,聚合
.groupBy($"value").count()
// 设置Streaming应用输出及启动
val query: StreamingQuery = resultStreamDF.writeStream
.outputMode(OutputMode.Complete())
.format("console")
.option("numRows", "10")
.option("truncate", "false")
.option("checkpointLocation","C:\\Users\\Administrator\\Desktop\\fsdownload\\checkpoint")
.start()
query.awaitTermination() // 查询器等待流式应用终止
query.stop() // 等待所有任务运行完成才停止运行
kafkaStreamDF.checkpoint()
resultStreamDF.checkpoint()
}
}五、运行下效果看看
备注:
1、所有使用到流计算的地方都需要设置checkpoint,在spark structured streaming里面设置checkpoint的方式和之前是不一样的,这里设置checkpoint需要在writeStream里面的option进行设置。
.option("checkpointLocation","C:\\Users\\Administrator\\Desktop\\fsdownload\\checkpoint")同时在checkpoint里面存储的信息也和之前不一样,现在存储的信息如下:





还没有评论,来说两句吧...