在flink里面,所有的算子计算都是根据对应的数据结构进行计算的。这篇文章我们就介绍下flink支持哪些数据类型。
一、元组类型
元组类其实就是flink里面所有的Tuple,他是一个不好喊固定数量的各种类型的字段的复合数据类型。这个Tupe不支持null存储。
在flink里面,默认的有Tuple1到Tuple25的类供我们使用,这个数字代表的就是这个tuple里面有多少个字段,如果我们有5个字段,就使用Tuple5,有10个字段就使用Tuple10即可。然后取值的话,可以直接根据tuple对象的f取值,比如取第0个,则直接使用value.f1即可。示例代码如下:
package com.test; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.IntCounter; import org.apache.flink.api.common.accumulators.SimpleAccumulator; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; public class Test { public static void main(String[] args) throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18), new Tuple2<String, Integer>("李四", 19), new Tuple2<String, Integer>("王五", 20)); datasources.map(new RichMapFunction<Tuple2<String,Integer>, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer map(Tuple2<String, Integer> value) throws Exception { return value.f1; } }).groupBy(0); } }
二、对象pojo类型
这种pojo类其实就是java的model对象,我们在前面演示过。这个pojo类比常规类型更易于使用,因为可以完成更加复杂的数据结构的定义。如果在flink中使用pojo类的话,需要注意以下几点:
1、pojo类必须是public的,必须新创建一个 xxxx.java文件,不能在内部类中进行定义。 2、pojo类必须含有默认的空构造方法。 3、pojo类中所有的属性字段都必须是public进行修饰的,同时这些字段需要有setter()和getter()方法 4、pojo类中的属性字段类型必须是flink支持的。
示例代码如下:
package com.flink.demo.model; import java.io.Serializable; import com.alibaba.fastjson.JSON; public class UserPoJo implements Serializable{ /** * */ private static final long serialVersionUID = 875901357635731459L; private String name; private Integer age; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public UserPoJo(String name, Integer age) { super(); this.name = name; this.age = age; } public UserPoJo() { super(); } @Override public String toString() { return JSON.toJSONString(this); } }
三、原生数据类型
这里的原生数据类型就是指的是java的原生基本类型,有:
byte short int long char float double boolean
同时也是支持String对象的数组的。
四、常规类型
这里的常规类型就是在java中的大多数类。但是有一个前提,就是这些类必须要能够被序列化和反序列化,如果不能被序列化和反序列化的话,那么对应的类型flink是不支持的。在flink中序列化和反序列化框架是使用的Kryo。
五、Value类型
在某些序列化和反序列化效率非常低的场景下,我们要使用Value类型。flink常用的Value类型有:
ByteValue ShortValue IntValue LongVlue FloatValue DoubleValue StringValue CharValue BooleanValue
六、hadoop的writable类型
在flink中,我们可以自定义类型,直接实现org.apache.hadoop.Writeble接口的类。此时,在write()方法和readFields()方法中定义的序列化逻辑将用于Writable类型的序列化
七、特殊类型
在flink中有一些特殊的类,例如:Either,Option和Try。这些类是在scala语言中定义的,可以直接使用。
还没有评论,来说两句吧...