[技术文章] Apache Hudi 介绍

[复制链接]
作者
n2n1   发布于2021-11-18 09:06:09 来自河北
接下来,我将从场景需求、设计决策、功能支持、性能调优和未来工作五个部分来解释我们如何在字节跳动的推荐系统中使用Hudi。

在推荐系统中,我们在以下两个场景中使用了数据湖:
  • 我们使用 BigTable 作为整个系统中近实时处理的数据存储。内部有一个组件TBase,提供BigTable的语义和搜索广告推荐场景中一些需求的抽象,屏蔽底层存储的差异。为了更好的理解,可以直接把它看成一个HBase。在这个过程中,为了服务离线数据分析和挖掘需求,需要将数据导出到离线存储。以往用户要么使用MR/Spark直接访问存储,要么通过扫描数据库获取数据,不符合OLAP场景下的数据访问特性。因此,我们基于数据湖构建了BigTable的CDC,以提高数据的时效性,降低近实时系统的访问压力,
  • 此外,我们还在特征工程和模型训练的场景中使用了数据湖。我们从内部和外部来源获得两种类型的实时数据流。一种是系统返回的实例,其中包括推荐系统在服务时获得的特征。另一个是来自有利位置的事件跟踪和各种复杂的外部数据源的反馈。这种类型的数据被用作标签,并形成具有前面提到的特征的完整机器学习数据样本。对于这个场景,我们需要实现一个基于主键的合并操作,将实例和标签合并在一起。时间窗口范围可能长达数十天,量在千亿行量级。系统需要支持高效的列选择和谓词下推。同时,还需要支持并发更新等相关能力。

这两个场景带来了以下挑战:
  • 数据非常不规则。与Binlog相比,WAL无法获取一行的所有信息,数据量变化较大。
  • 吞吐量比较大。 单表吞吐量超过100GB/s,单表需要PB级存储。
  • 数据模式很复杂。数据是高维和稀疏的。表格列数范围从 1000 到 10000+。并且有很多复杂的数据类型。

设计决策[color=var(--ifm-link-color)]#
在对引擎做出决定时,我们检查了三个最流行的数据湖引擎Hudi、Iceberg和DeltaLake。这三个在我们的场景中各有优缺点。最后,基于 Hudi 对上下游生态系统的开放性、对全局索引的支持以及某些存储逻辑的定制开发接口,选择Hudi作为存储引擎。
  • 对于实时写入,选择时效性较好的MOR。
  • 我们检查索引类型。首先,因为WAL不能每次都拿到数据的分区,所以必须使用全局索引。在几个全局索引的实现中,为了实现高性能写入,HBase 是唯一的选择。其他两个实现与 HBase 有很大的性能差距。
  • 在计算引擎和API方面,当时Hudi对Flink的支持并不完善,所以我们选择了支持更成熟的Spark。为了灵活地实现一些自定义的功能和逻辑,并且因为DataFrame API 有更多的语义限制,我们选择了较低级别的RDD API。

功能支持[color=var(--ifm-link-color)]#
功能支持包括存储语义的 MVCC 和模式注册系统。

首先,为了支持WAL写,我们实现了MVCC的payload,并基于Avro定制了一套带时间戳的数据结构实现。此逻辑通过视图访问对用户隐藏。另外,我们还实现了HBase的append语义,实现了对List类型的追加而不是覆盖。

由于 Hudi 从写入数据中获取模式,因此不方便与其他系统一起使用。我们还需要一些基于schema的扩展,所以我们搭建了一个元数据中心,提供元数据相关的操作。
  • 首先,我们基于内部存储提供的语义实现了原子变化和多站点高可用。用户可以通过界面原子地触发模式更改并立即获得结果。
  • 通过添加版本号实现 Schema 的版本控制。有了版本号后,我们就可以轻松地使用模式而不是来回传递 JSON 对象。多版本也可以灵活实现模式演化。
  • 我们还支持列级别的附加信息编码,以帮助业务在某些场景下实现特殊的扩展功能。我们将列名替换为 ID,以节省存储过程中的成本。
  • 带有 Hudi 的 Spark 作业在运行时,会在 JVM 级别构建本地缓存,并通过 pull 方法将数据与元数据中心同步,以实现对进程内 schema 的 schema 和单例实例的快速访问。

性能调优[color=var(--ifm-link-color)]#
在我们的场景中,性能挑战是巨大的。单表最大数据量达到400PB+,每日增量为PB级,总数据量达到EB级。因此,我们根据性能和数据特性做了一些工作来提高性能。

序列化包括以下优化:
  • Schema:使用Avro进行数据序列化的成本非常昂贵,消耗了大量的计算资源。为了解决这个问题,我们首先使用JVM中的singleton schema实例来避免序列化过程中消耗CPU的比较操作。
  • 通过优化payload逻辑,减少了运行序列化的次数。
  • 借助第三方Avro序列化实现,将序列化过程编译成字节码,提高SerDe的速度,减少内存占用。序列化过程已被修改,以确保我们的复杂架构也可以正确编译。


压实过程的优化如下。
  • 除了默认的 Inline/Async 压缩选项,Hudi 还支持压缩的灵活部署。压缩作业的特征与摄取作业有很大不同。在同一个Spark应用中,不仅无法进行针对性的设置,还存在资源灵活性不足的问题。我们首先构建一个独立部署的脚本,以便可以独立触发和运行压缩作业。一个低成本的混合队列用于压实计划的资源调度。此外,我们还开发了基于规则和启发式的压缩策略。用户的需求通常是保证天级或小时级的SLA,对某些分区的数据进行针对性的压缩,所以提供了针对性的压缩能力。
  • 为了缩短critical compaction的时间,我们通常会提前进行compaction,避免所有的工作都在一个compaction job中完成。但是,如果压缩的文件组有新的更新,则必须再次压缩。为了优化整体效率,我们根据业务逻辑对 FileGroup 何时应该压缩进行了启发式调度,以减少额外的压缩成本。此功能的实际好处仍在评估中。
  • 最后,我们对compaction进行了一些流程优化,比如不使用WriteStatus的Cache等。


作为专为吞吐量设计的存储,HDFS 在集群使用率较高时会出现严重的实时写入故障。通过与HDFS团队的沟通合作,做了一些改进。
  • 首先,我们将原来的数据 HSync 操作替换为 Hflush,以避免分布式更新导致的磁盘 I/O 写放大。
  • 我们根据场景调优进行了激进的管道切换设置,HDFS团队开发了灵活的API,可以控制管道以实现该场景下的灵活配置。
  • 最后,通过日志文件的独立I/O隔离来保证实时写入的及时性。


还有一些小的性能改进、流程修改和错误修复。如果您有兴趣,请随时与我讨论。
未来工作[color=var(--ifm-link-color)]#
未来,我们将在以下几个方面继续迭代。
  • 产品化问题:目前API的使用方式和参数调优对用户的要求很高,尤其是调优运维,需要深入理解Hudi原理才能完成。这阻碍了向用户推广。
  • 生态支持问题:在我们的场景中,技术栈主要在Flink上,未来会探索Flink的使用。此外,上下游使用的应用和环境复杂,需要跨语言、通用的接口实现。目前与 Spark 的绑定很麻烦。
  • 成本和性能问题:一个常见的话题,因为我们的场景比较广泛,优化带来的好处是非常可观的。
  • 存储语义:我们使用 Hudi 作为存储而不是表格式。因此,未来我们计划使用 Hudi 扩展场景,需要更丰富的存储语义。我们将在这方面做更多的工作。

n2n1.cn



回复

使用道具 举报

您需要登录后才可以回帖 登录 | 创建账号

本版积分规则

Archiver|小黑屋|( 冀ICP备2021005463号 )

GMT+8, 2024-4-29 20:22 , Processed in 0.132644 second(s), 27 queries , Gzip On.

N2N1 It社区 n2n1.cn

Copyright © 2001-2021,MeiCheng.

快速回复 返回顶部 返回列表