我们在写flink的kob代码的时候,大家心理应该都有印象,第一步就是需要去获取一个flink的执行环境。例如在之前的代码里面我们是直接用的如下代码:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
这篇文章我们就来介绍下Flink开发如何获取执行环境,获取什么样的执行环境。
一、介绍下获取环境使用的类信息
前面介绍过了,在flink中,最基本的主要就是流处理和批处理。所以一些的执行环境获取都是依赖于流处理或者批处理的类来执行的。
在流处理里面,我们获取执行环境使用的类是:
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
在批处理里面,我们获取执行环境使用的类是:
org.apache.flink.api.java.ExecutionEnvironment
二、获取执行环境的三个方法
目前获取执行环境的方法主要有三个,我们可以直接通过上面的类进行获取即可
2.1、getExecutionEnvironment()方法
使用此方法会自动获取当前的执行环境,是最常用的获取执行环境的方式,特别是线上,几乎全是这种获取执行环境的方式。
示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment env2 = ExecutionEnvironment.getExecutionEnvironment();
2.2、createLocalEnvironment()方法
使用此方式,将返回一个本地执行环境,需要在调度的时候指定并行度。
示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); ExecutionEnvironment env2 = ExecutionEnvironment.createLocalEnvironment(); env2.setParallelism(1);
2.3、createRemoteEnvironment()方法
使用此方法,返回一个远程集群的执行环境,在创建的时候,我们需要指定远程flink集群的作业管理器的ip地址、端口、并行度、jar包位置等。
示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.31.30", 6021, 5,"/FlinkDemo.jar"); ExecutionEnvironment env2 = ExecutionEnvironment.createRemoteEnvironment("192.168.31.30", 6021, 5,"/FlinkDemo.jar");
三、使用Table API/SQL如何获取环境
在使用table api/sql的时候,我们从前面的案例来说,可以看到还创建了一个table的环境。这个table的环境,主要是用于执行Table API/SQL的。
示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment.create(env); ExecutionEnvironment env2 = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment.create(env2);
四、如何配置执行环境的参数
在我们获取到了执行环境之后,某些参数我们是可以进行手动修改的,所以我们可以直接在这里进行配置即可,下面分别介绍下有哪些参数。
4.1、setParallelism()方法
这个方法主要是设置并行度的,这个并行度将使所有算子与并行实例一起运行。示例如下:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);
4.2、setBufferTimeout()方法
这个方法只有在StreamExecutionEnvironment才会存在,也就是说在DataStream的模式下才会存在,在Data Set模式下是不存在,他主要是用来设置刷新输出缓冲区的最大时间频率,单位是ms。由于在实际生产环境中,我们的流数据他不是一个个在网络中进行传输的,因此可以先将数据缓存起来,再进行批量传输。这样子虽然增加了延迟,但是还是属于近实时。
备注:在这个方法里面设置的参数可以是正整数,0,或者是-1。如果是正整数的话,吞吐量会增加,但是会有一定的延迟。生产上我们的这个时间一般还是设置的毫秒级别,所以几乎是近实时的实际表现。如果是0的话,则代表每条记录后都会触发,属于准实时,但是吞吐量不高。如果是-1的话,则代表缓存中的数据满了才会被发送,这样子会造成很多延迟,一般使用率较低。
具体使用示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setBufferTimeout(100);
4.3、setMaxParallelism()方法
这个方法主要是设置最大允许的并行度,用以限定动态扩展并行度的限制。示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(5);
4.4、setStateBackend()方法
这个方法主要是设置状态后端,在flink上内置了3种默认的状态后端,可以直接开箱即用。这三种分别是
1、MemoryStateBackend 用内存进行存储,常用于小状态和本地调试 2、FsStateBackend 用文件系统存储状态,常用于大状态、长窗口、高可用的场景 3、RocksDBStateBackend 用Rocks数据库存储状态,常用于超大状态、长窗口、可增量检查点和高可用场景
示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new MemoryStateBackend());
4.5、setRestartStrategy()方法
这个方法是用于设置故障重启后的策略,当任务的失败率上升到一定的程度时,Flink认为本次任务最终是失败的。示例如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(1, TimeUnit.MINUTES), Time.of(3, TimeUnit.MINUTES)));
这里代表的是最大失败次数是3次,衡量失败次数的时间间隔是1分钟,两次重启间隔3分钟。
备注:这里只是演示数据,不对合理性进行评估。
以上的内容主要就是讲解下flink初始化运行时环境的信息,包括如何创建、如何配置等。
还没有评论,来说两句吧...