4 个回答
堆外内存: flink 将原来的MemorySegment变成了抽象类, 并提供了两个具体的子类:HeapMemorySegment 和 HybridMemorySegment。前者是用于分配堆内存, 后者用来分配堆外内存和堆内存的。 flink 加入了对堆外内存的支持, 主要是考虑到以下几个方面: 同上面定制化内存管理 v1.10的好处 但是使用堆外内存同样存在一些潜在的问题:
1、堆内存可以很方便地进行监控和分析, 相较而言堆外内存则更加难以控制。
2、flink 有时可能需要短生命周期的 MemorySegment,在堆上申请开销会更小。
3、一些操作在堆内存上会更快一些。
发布于:3个月前 (02-06) IP属地:四川省
直接操作二进制数据: 以排序为例, flink 会从 MemoryManager 中申请一批 MemorySegment, 这批MemorySegment 称作 sort buffer, 用来存放排序的数据。
1、flink 会把 sort buffer 分成两块区域. 一个区域是用来存放所有对象完整的二进制数据. 另一个区域用来存放指向完整二进制数据的指针以及定长的序列化后的key(key+pointer)。如果需要序列化的key是个变长类型, 如String, 则会取其前缀序列化。当一个对象要加到 sort buffer 中时,它的二进制数据会被加到第一个区域,指针(可能还有key)会被加到第二个区域。
2、将实际的数据和指针加定长key分开存放有两个目的。第一, 交换定长块(key+pointer)更高效,不用交换真实的数据也不用移动其他key和pointer。第二, 这样做是缓存友好的。因为key都是连续存储在内存中的,可以大大减少 cache miss(cpu要访问的数据在Cache中有缓存,称为hit,反之则称为miss).
3、排序的关键是比大小和交换。flink中会先用 key 比大小,这样就可以直接用二进制的key比较而不需要反序列化出整个对象, 因为key是定长的。所以如果key相同(或者没有提供二进制key),那就必须将真实的二进制数据反序列化出来,然后再做比较。之后只需要交换key+pointer就可以达到排序的效果, 真实的数据不用移动。
4、最后访问排序后的数据, 可以沿着排好序的key+pointer区域顺序访问, 通过pointer找到对应的真实数据,并写到内存或外部。
发布于:3个月前 (02-06) IP属地:四川省
定制化序列化框架:在 flink 中处理的数据流通常是同一类型,由于数据集对象的类型固定,对于数据集可以只保存一份对象Schema信息,节省大量的存储空间.同时,对于固定大小的类型,也可通过固定的偏移位置存取。当我们需要访问某个对象成员变量的时候,通过定制的序列化工具,并不需要反序列化整个Java对象,而是可以直接通过偏移量,只是反序列化特定的对象成员变量。如果对象的成员变量较多时,能够大大减少Java对象的创建开销,以及内存数据的拷贝大小。flink 通过 Java Reflection 框架分析基于 Java 的 flink 程序 UDF (User Define Function)的返回类型的类型信息,通过 Scala Compiler分析基于 Scala 的 Flink 程序 UDF 的返回类型的类型信息。类型信息由 TypeInformation 类表示,TypeInformation 支持以下几种类型:
1、BasicTypeInfo: 任意Java 基本类型(装箱的)或 String 类型。
2、BasicArrayTypeInfo: 任意Java基本类型数组(装箱的)或 String 数组。
3、WritableTypeInfo: 任意 Hadoop Writable 接口的实现类。
4、TupleTypeInfo: 任意的 flink Tuple 类型(支持Tuple1 to Tuple25)。 flink tuples 是固定长度固定类型的Java Tuple实现。
5、CaseClassTypeInfo: 任意的 Scala CaseClass(包括 Scala tuples)。
6、PojoTypeInfo: 任意的 POJO (Java or Scala),例如,Java对象的所有成员变量,要么是 public 修饰符定义,要么有 getter/setter 方法。
7、GenericTypeInfo: 任意无法匹配之前几种类型的类。
发布于:3个月前 (02-06) IP属地:四川省
定制化内存管理: fink 并不是将大量对象存在堆上,而是将对象都序列化到一个预分配的内存块上,这个内存块叫做 MemorySegment, 它代表了一段固定长度的内存(默认大小为 32KB),也是 flink 中最小的内存分配单元,每条记录都会以序列化的形式存储在一个或多个MemorySegment。同时flink 采用类似DBMS的sort和join算法,尽可能多地直接操作二进制数据,从而使序列化/反序列化带来的开销达到最小。
1)v1.9及以下版本TaskManager 的堆内存主要被分成了三个部分:
1)v1.9及以下版本TaskManager 的堆内存主要被分成了三个部分:
1、Network Buffers:一定数量的32KB大小的 Buffer,主要用于网络传输.
2、Memory Manager Pool:一个由 MemoryManager 管理的,由众多 MemorySegment 组成的超大集合。Flink 中的算法(如 sort/shuffle/join)会向这个内存池申请 MemorySegment,将序列化后的数据存于其中,使用完后释放回内存池。默认情况下,池子占用了堆内存的70%的大小。
3、Remaning(free)Heap:这部分的内存是留给用户代码以及 TaskManager 的数据结构使用的。因为这些数据结构一般都很小,所以基本上这些内存都是给用户代码使用的。从 GC 的角度来看,可以把这里看成的新生代,也就是说这里主要都是由用户代码生成的短期对象。
2)v1.10及以上TaskManager内存模型: 较于以前版本最大的变化就是将Managed Memory和Network Buffer移至堆外,很大的提高了对堆外空间的使用. 这样做的好处:1、之前的内存模型将大量内存放在堆上,当数据规模量级不断上涨时,可能需要上百GB的堆内空间。而启动一个上百GB的JVM需要很长的时间,并且GC时长也随着提高。使用堆外内存可以极大地减小堆内的空间,可以使得taskmanager可以轻易地扩展至上百GB。
2、高效的IO操作。堆外内存可以基于零拷贝机制减少IO操作过程的cpu copy次数,提升IO性能。
3、堆外空间属于进程间共享,即使JVM崩溃也不会丢失数据,因此可以用来故障恢复。
发布于:3个月前 (02-06) IP属地:四川省
我来回答
您需要 登录 后回答此问题!