在实际的flink开发中,我们还会涉及到自定义一些参数,然后把这些参数的值传递给某些方法,这样子在这些方法里面就可以获取到对应配置的自定义参数了。
在日常的java开发中,我们经常会涉及到使用构造函数向某个类函数传递参数,在flink中,这种方式也是适用的。但是我们还是介绍一下,在flink应用开发里面常使用的产地参数给函数的方法。
在flink应用开发中,我们常用的主要事依靠configuration来定义自定义参数,然后使用withParameters(config)的方式向类传递参数,下面我们演示一下,完整代码示例如下:
package org.example; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; public class ConfigurationJob { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> toFilter = env.fromElements(1, 2, 3); Configuration config = new Configuration(); config.setInteger("userId", 2); toFilter.filter(new RichFilterFunction<Integer>(){ private Integer filterUserId; @Override public void open(Configuration parameters) throws Exception { filterUserId = parameters.getInteger("userId",0); super.open(parameters); } @Override public boolean filter(Integer value) throws Exception { return filterUserId == value; } }).withParameters(config).print(); } }
上面的代码我们可以看到:
1、首先我们new了一个configuration来自定义参数,在实际的开发中,这些参数其实很可能是我们在启动的时候就添加上去了,所以就不需要再new了,直接获取环境的configuration即可。 2、哪个函数需要使用自定义的参数,我们就使用.withParameters(config)这个方法把参数传递给具体的函数。 3、在使用的函数里面,一般都会有一个open方法,这里的open方法传递进来的参数就是configuration,我们从这里的configuration里面获取定义的值即可。 4、这样子具体的函数里面就可以获取到我们想要的自定义参数了。
本文的代码案例使用的是filter进行演示,我们运行下看下效果:
改进点:
1、这里我们可以看到使用哪个函数,我们就要调用一次.withParameters(config)这个方法,那么有没有办法可以做一个全局,也就是一次性设置,所有的函数都可以获取到呢?答案是有的,示例如下:
env.getConfig().setGlobalJobParameters(config);
这样子就不需要再每一个调用的地方使用withParameters(config)方法了,同样的在函数的open方法里面可以获取到对应的信息。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...