在flink中,我们可以有很多外部的数据源,例如:文件、集合、网络流、消息队列、数据库等,我们在使用flink操作数据的时候,首先都需要读取数据源,因此本篇文章我们介绍下flink读取csv文件并转换成pojo对象数据结构。
1、定义一个csv文件,这里我们手动做一个简单的示例数据即可
id,name,age 1,张三,15 2,李四,16
然后把这个文件保存成user.csv
2、定义一个user的实体类,用来解析user.csv
package com.flink.demo.model; import java.io.Serializable; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.experimental.Builder; @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode @Builder public class UserPoJo implements Serializable{ /** * */ private static final long serialVersionUID = 875901357635731459L; private Integer id; private String name; private Integer age; @Override public String toString() { return JSON.toJSONString(this); } }
3、编写一个flinkjob读取csv文件并且打印下实体信息。
package com.flink.demo.show;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import com.flink.demo.model.UserPoJo;
public class ReadCsvJob {
public static void main(String[] args) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<UserPoJo> source = environment.readCsvFile("D:\\example\\user.csv")
// 所有的csv文件都是以英文逗号进行分割的。
.fieldDelimiter(",")
// 第一行是我们的字段行,不需要进行解析
.ignoreFirstLine()
// 这里是解析第几列的数据,有3列,我们就写3个true,如果有一列不需要囊括进来,则选择false即可。
.includeFields(true, true, true)
// 这里是需要把csv文件里面的字段转换成对应的实体类,同时标记对应的字段名称
.pojoType(UserPoJo.class, new String[] { "id", "name", "age" });
//最后我们把source的内容打印出来
source.print();
}
}4、运行测试一下。
可以看到图中输出了从csv的执行结果。
备注:
1、flink读取csv文件很简单,所有的代码函数的注释都在代码里面,直接看即可。


还没有评论,来说两句吧...