在flink datastream 应用程序开发中,我们知道这里的source数据是从外部读取到的。
这里我们回想一下kafka的客户端,我们在使用kafka客户端的时候,会发现,当我们push数据的时候,数据首先被推动到client的缓存里面,然后client会潘丹当前的缓存大小和缓存时间是否满足需要发送出去的条件,如果满足条件,数据就会被发送到kafka服务端,如果不满足的话,则数据会被存储到client,等到当前的缓存大小和缓存时间满足任一一个的时候,就会发送出去。
那么在flink中其实也是一样的,这样子做的目的是有利于优化吞吐量。所以数据会先被缓存到source的缓冲区,当满足缓冲区的大小的时候,就会把数据批量的发送给对应的函数进行处理。当然在flink中,也是一样的,满足buffersize或者时间的任一一条之后,数据会被发送给对应的算子进行计算。
这里我们介绍时间,所以这里主要介绍的是时间,默认值是100毫秒,也就是数据被放到缓冲区之后,最迟100毫秒之后会发送给对应的函数进行处理。在有时候我们为了数据最更近实时的处理,我们需要把这里的时间设置的短一些,有时候我们的场景由于数据比较少,我们希望间隔大一点再处理数据,可能会把这里的时间设置的长一些,但终究都是根据实际的情况处理。
这里设置source时延的方法是:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setBufferTimeout(5000);
这里我们设置的是5秒。
备注:
1、如果我们为了最大限度的提高吞吐量的话,可以设置下时间为-1,示例如下:
env.setBufferTimeout(-1);
当删除这个超时的时候,只有当数据满足缓冲区的时候,才会被发送给对应的算子进行计算,因此这种方式不太可取。
2、如果想要最大化的提高吞吐量的话,可以将超时时间设置为接近0的值,例如 3毫秒或者5毫秒。但是这样子越接近于0,性能下降的也比较快,所以整体来说需要根据实际的情况进行综合考虑。
还没有评论,来说两句吧...