最近处于学习复盘的阶段,因此准备写一系列关于flink的实战相关的文章。今天介绍下第一篇,在flink中使用DataSet API处理有界数据流。
学习下基础知识
1、DataSet API是flink里面做批量计算的一种api。
2、有界数据流就是指每一次处理的数据是以批次的方式给出的,这个批次没有后续数据增加的情况,例如:分析一篇文章,那么此文章的内容就是一个批次,这就是有界的数据流。
3、DataSet API的任务在flink集群里面执行的时候,是一个一次性任务,不是常驻的任务。一次性任务就是指任务执行完了后,job就会被停止掉,如果需要继续执行,则需要重新启动。
实战化
这里我们主要演示下使用DataSet API处理一批英文语句,然后进行下wordcount操作。
一、编写一个BatchWordCount的类,代码如下:
package com.flink.demo; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; /** * 此类主要介绍下使用flink的DataSet API处理有界的数据流 * @author Administrator * */ public class BatchWordCount { public static void main(String[] args) throws Exception { //获取一下flink的执行环境 ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); //随便构建一个数据集 DataSource<String> datas = environment.fromElements("My hobby is listening to music" ,"When I was a child I enjoy listening to music especially the cartoon music" ,"I often lose myself in it" ,"When I am in trouble" ,"music can make me calm down"); //根据第0个字段进行group by 再把第一个字段进行求和 AggregateOperator<Tuple2<String, Integer>> ds = datas.flatMap(new LineSplitter()).groupBy(0).sum(1); ds.print(); } }
这里我们主要是做如下操作
1、构建一个有界的数据集,这里我们构建有界的数据集可以是一个数组,也可以是一个list,也可以是直接读取某个文件的内容等多样化的数据源数据构成。 2、把这些数据集进行数据切分,同时进行group by再求和。 4、最后把求和后的结果打印出来。
上面还要用到一个切割方法类
package com.flink.demo; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * 切割英文单词 * * @author Administrator * */ public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { /** * */ private static final long serialVersionUID = -230153465595371704L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 英文的词语之间使用的是空格进行分割 String[] words = value.split(" "); for(String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }
二、测试验证
执行BatchWordCount类的main方法。查看执行结果
以上我们使用Data Set API处理有界数据流的实战就完成了,最后附上源码下载:下载源码
还没有评论,来说两句吧...