在flink DataStream应用开发中,source是咱们程序读取数据的地方。在flink 自带的api里面,提供了一些自带的source,我们可以很方便的使用,下面我们介绍下flink应用程序自带的source。
1)readTextFile(path) 从文件中读取,示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream = env.readTextFile("hdfs://192.168.31.218:9000/hello");
这里我们一般读取的话,都是从hdfs上进行读取,除了本地测试,几乎不从本地文件开始读取。关于从文件中读取的api,还有两个扩展的,分别是:
readFile(fileInputFormat, path) 按照指定的文件格式读取文件 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) 基按照指定的文件格式读取文件,可设置定期读取的时间interval,还可以使用filer过滤掉某些文件,这个api用的比较少。
2)socketTextStream从socket中读取数据,示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream = env.socketTextStream("192.168.31.218", 9999);
在上一篇文章《Flink应用开发系列(二十四)DataStream开发之wordcount》我们介绍的案例就是基于socket进行数据读取的。
3)fromCollection(Collection) 从java util collection创建数据流。示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream = env.fromCollection(Arrays.asList("张三","李四"));
这里主要是从集合中读取相关的数据来创建数据流。关于集合的拓展方法有:
#从迭代器中创建数据流 fromCollection(Iterator, Class) #从给定的对象序列中创建数据流 fromElements(T ...) #从迭代器并行创建数据流 fromParallelCollection(SplittableIterator, Class) #基于给定间隔内的数字序列并行生成数据流 generateSequence(from, to)
以上就是我们常见的flink中自带的source,我们在做flink datastream应用开发的时候,有时候会用到。
备注:
1、仅是有时候用到,更多的场景会从第三方数据源或者mq中进行数据读取。 2、除了自带的,我们还可以自定义source,将在下篇文章介绍到
还没有评论,来说两句吧...