承接上一篇《Flink学习系列(二)flink是什么》。这篇文章我们讲解下flink的Datasource。
一、什么是flink的datasource
我们在前面介绍了flink是一个分布式的数据处理框架,那么既然是一个分布式的数据处理框架,那么肯定是需要有数据源的。flink的运作就是把数据从数据源读取回来,然后进行分布式计算处理。所以这里我们把读取的数据源就称为flink的datasource。不管是flink的dataset模型还是flink的datastream模型都需要使有datasource。
二、常见的datasource有哪些
我们常见的datasource有:
1、自定义集合
2、各种文件,例如:excel,csv,txt,hdfsfile等。
3、各种输入流,例如:socket
4、各种消息队列,例如:kafka,rocketmq,rabbitmq,pulsar等。
5、数据库,例如:mysql,postgreasql,doris等。
三、演示下对应的示例
3.1)消息队列演示:演示kafka
// 初始化kafka connector信息 KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(consumerTopic).setGroupId(consumerGroup).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build(); // 开始从kafka里面进行监听 DataStreamSource<String> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(),"Kafka Source");
3.2)自定义集合演示:演示list
ArrayList<Integer> arrayList = new ArrayList<Integer>(); arrayList.add(1); arrayList.add(2); arrayList.add(3); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> dataStreamSource = env.fromCollection(arrayList);
3.3)读取文件演示:演示读取csv
public void readcsv() { // 初始化本地运行环境 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.readTextFile("hdfs://127.0.0.1:9000/bbb/ccc/d.csv"); }
3.4)读取sql演示:演示读取mysql
mysql的读取比较复杂,需要进行自定义,我们在下一篇里面介绍下。
还没有评论,来说两句吧...