在flink里面,所有的算子计算都是根据对应的数据结构进行计算的。这篇文章我们就介绍下flink支持哪些数据类型。
一、元组类型
元组类其实就是flink里面所有的Tuple,他是一个不好喊固定数量的各种类型的字段的复合数据类型。这个Tupe不支持null存储。
在flink里面,默认的有Tuple1到Tuple25的类供我们使用,这个数字代表的就是这个tuple里面有多少个字段,如果我们有5个字段,就使用Tuple5,有10个字段就使用Tuple10即可。然后取值的话,可以直接根据tuple对象的f取值,比如取第0个,则直接使用value.f1即可。示例代码如下:
package com.test;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
public class Test {
public static void main(String[] args) throws Exception {
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<String, Integer>> datasources = environment.fromElements(new Tuple2<String, Integer>("张三", 18),
new Tuple2<String, Integer>("李四", 19),
new Tuple2<String, Integer>("王五", 20));
datasources.map(new RichMapFunction<Tuple2<String,Integer>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer map(Tuple2<String, Integer> value) throws Exception {
return value.f1;
}
}).groupBy(0);
}
}二、对象pojo类型
这种pojo类其实就是java的model对象,我们在前面演示过。这个pojo类比常规类型更易于使用,因为可以完成更加复杂的数据结构的定义。如果在flink中使用pojo类的话,需要注意以下几点:
1、pojo类必须是public的,必须新创建一个 xxxx.java文件,不能在内部类中进行定义。 2、pojo类必须含有默认的空构造方法。 3、pojo类中所有的属性字段都必须是public进行修饰的,同时这些字段需要有setter()和getter()方法 4、pojo类中的属性字段类型必须是flink支持的。
示例代码如下:
package com.flink.demo.model;
import java.io.Serializable;
import com.alibaba.fastjson.JSON;
public class UserPoJo implements Serializable{
/**
*
*/
private static final long serialVersionUID = 875901357635731459L;
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public UserPoJo(String name, Integer age) {
super();
this.name = name;
this.age = age;
}
public UserPoJo() {
super();
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}三、原生数据类型
这里的原生数据类型就是指的是java的原生基本类型,有:
byte short int long char float double boolean
同时也是支持String对象的数组的。
四、常规类型
这里的常规类型就是在java中的大多数类。但是有一个前提,就是这些类必须要能够被序列化和反序列化,如果不能被序列化和反序列化的话,那么对应的类型flink是不支持的。在flink中序列化和反序列化框架是使用的Kryo。
五、Value类型
在某些序列化和反序列化效率非常低的场景下,我们要使用Value类型。flink常用的Value类型有:
ByteValue ShortValue IntValue LongVlue FloatValue DoubleValue StringValue CharValue BooleanValue
六、hadoop的writable类型
在flink中,我们可以自定义类型,直接实现org.apache.hadoop.Writeble接口的类。此时,在write()方法和readFields()方法中定义的序列化逻辑将用于Writable类型的序列化
七、特殊类型
在flink中有一些特殊的类,例如:Either,Option和Try。这些类是在scala语言中定义的,可以直接使用。









还没有评论,来说两句吧...