在学习flink的时候,大家一般会先去查下资料了解下flink是一个什么东西。但是对于我的了解来说,我一般会先看看这项技术是一个什么东西,能实现什么效果,然后再逐渐推进对应的学习上。由于我这边已经了解Flink了,所以这一系列博文主要是以我自己的认知来叙述Flink,让大家更加充分能了解Flink是什么,能干什么。今天介绍第一篇flink的内容。先写一个demo给大家看看是什么样的。
1、首先创建一个maven项目,在pom里面引入如下依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.14.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.15.0</version> <scope>provided</scope> </dependency>
2、然后编写一个wordcount代码
package com.big.data.flink.datastream; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.big.data.flink.filter.LineSplitter; import lombok.extern.slf4j.Slf4j; @Slf4j /** * 整个系统里面我们模拟从kafka的队列里面获取每一串字母,然后做一个helloword count计算,并且打印到控制台 * * @author Administrator * */ public class WordCount { public static void main(String[] args) throws Exception { if (null == args || args.length != 2) { // 一般我们在运行的时候,这里的参数都是直接传入进来的,但是我们在本地进行测试,因此这里我们暂时写死一下。 args = new String[] { "192.168.31.20:9092", "t_wordcount", "t1" }; } // 由于参数是的传进来的,所以我们需要动态获取一下。 String brokers = args[0]; String consumerTopic = args[1]; String consumerGroup = args[2]; // 初始化本地运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 初始化kafka connector信息 KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(consumerTopic) .setGroupId(consumerGroup).setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 开始从kafka里面进行监听 DataStreamSource<String> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); // 计数 SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new LineSplitter()).keyBy(0) .sum(1); sum.print(); env.execute("WordCount Example"); } }
3、这里我们使用到了filter,所以要创建一个filter
package com.big.data.flink.filter; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(" "); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
4、然后我们编写一个kafka生产者。
kafka生产者的代码见《kafka生产者代码示例demo》
5、我们先运行kafka的生产者代码,会看到消息发送成功了。
6、我们再运行上面的wordcount代码,可以看到这边有对应的wordcount输出:
以上我们的wordcount就运行成功了。
但是对于初学者来说,这个程序到底是什么意思呢?有什么用呢?在这里给大家介绍一下。
1、整个wordcount运行主要是从kafka里面接收每一条消息。
2、接收到的消息是类似于: this is a test。
3、然后我们使用flatmap的函数,把这句话根据空格进行切割,切割后就是【this,is,a,test】
4、然后我们把所有接收到的单词做一下统计,比如this,接收到一次,则存储为:(this,1),如果this出现了两次,则存储为:(this,2)
5、然后我们把结果打印出来。
备注:上诉图片里面我们打印是2,代表我们运行了两次kafka的生产者。大家可以直接尝试一下。
最后用流程图的方式解释下此次wordcount的运行流程:
还没有评论,来说两句吧...