数据仓库在国内起源于上世纪90年代,金融行业率先开启数据仓库时代。在2013年以后,互联网大数据技术逐步兴起,依托Hadoop生态组件创建的Lambda架构数据仓库模式逐步占领主宰地位(目前也是主导地位),2018年之后大家逐渐发现Lambda架构的不足,开始着手向Kappa架构演进(从使用效果来看不稳定,很少有公司使用这种模式),2020年开始兴起的数据湖技术已经被腾讯、微博等互联网公司付诸实践,相信在不久的未来能够替代Lambda架构。
一 传统数据仓库模式在上世纪90年代开始,一直到2010年之前,以Oracle、DB2为代表的商业数据库占领了国内金融行业和大中型企业的市场。这个阶段的数据仓库模型如下图所示:
这一时段的数据仓库架构主要是将Kimball架构和Inmon CFI架构嫁接。这种架构利用了CFI中处于中心地位的EDW,此处的EDW完全与分析和报表应用端隔离。它仅仅作为Kimball分隔的展现区的数据来源,其中的数据是包含维度的、原子的(还有些聚集数据)、以过程为中心。
数据源主要是传统DB数据库,比如Oracle、DB2、MySQL、SQLServer以及一些ftp文本文件等结构化的数据。ETL工具主要是:Informatica、DataStage、Kettle或者DBLink直连的方式。BI报表:主要是Oracle的BIEE。痛点传统数据仓库往往要面临从采购服务器,建立物理仓库到逻辑仓库等一个较长的周期,所以数据仓库面临的第一个挑战就是怎样去降低建设周期。
随着大数据的到来,传统数据仓库管理的大多是结构化数据。如何对半结构化的数据进行统一全面的管理就成为传统数据仓库面临的第二个挑战。
随着数据处理种类的多样化和数据量的不断变大,不同的技术被叠加在一起从而使得数据仓库架构变得越发复杂。同一企业里往往会同时存在各种技术类型的数据仓库。
二 互联网时代Lambda架构数据仓库目前大多数公司的数据架构仍然是Lambda架构,它解决了这些公司大数据的离线和实时数据处理,一个典析的Lambda架构如下图所示:
从底层的数据源开始,通过Kafka、Flume等大数据组件,将各种各样的数据同步到大数据平台,然后分成两条线进行计算。一条线进入离线批量数据处理平台(Spark、Hive、MapReduce等),去计算T+1或者H+1的业务指标,这些指标需要T+1或者H+1才能看到;另外一条线是进入到实时数据处理平台(Flink、SparkStreaming等),去计算实时统计指标。
经过多年的发展,Lambda架构比较稳定,能满足过去的应用场景。但是它有很多致命的弱点:
2.1 数据口径不一致问题因为离线和实时计算走的是两个完全不同的代码,算出来的结果往往不同,可能会当天看到一个结果数据,第二天发现数据变成了。
2.2 T+1离线严重超时像新浪微博这种体量的公司,每天有400TB+的数据写入大数据平台,而且数据在不断地增加。我们经常会发现在夜间3-4个小时内,离线程序执行不完,不能保证数据在上班之前准时生成。尤其是在夜间发生故障之后,白天的数据产出时间更加难以把控。
2.3 需要维护两套代码每次数据源有变化,或者业务方有新的需求。都要修改两次业务逻辑代码,既要修改离线的ETL任务,又要修改流式任务,开发周期很长(工作量是双倍),人力成本比较大。
为了解决Lambda架构的痛点,就产生了Kappa架构,相信大家对这个架构也非常熟悉。
三 Kappa架构针对Lambda架构需要维护两套程序的缺点,后面产生了Kappa架构。Kappa架构的核心思想是,改进流计算系统来解决全量数据,让实时和离线处理过程采用同一套代码。Kappa架构的初衷是,只有在必要的时候才会对历史数据进行重新计算。下图是Kappa架构模型:
Kappa架构也不是完美的,它也有很多问题。
3.1 链路更加混乱复杂首先,我们需要借用Kafka来构建实时场景,但是如果需要对ODS层数据做进一步的分析时,就要接入Flink计算引擎把数据写入到DWD层的Kafka,同样也会将一部分结果数据写入到DWS层的Kafka。但是,如果想做简单的数据分析时,又要将DWD和DWS层的数据写入到ClickHouse、ES、MySQL或者是Hive里做进一步分析,这无疑带来了链路的复杂性。
3.2 数据一致性受到挑战其次,Kappa架构是严重依赖于消息队列的,我们知道消息队列本身的准确性严格依赖它上游数据的顺序,但是,消息队列越多,发生乱序的可能性越大。通常情况下,ODS层的数据是绝对准确的,把ODS层数据经过计算之后写入到DWD层时就会产生乱序,DWD到DWS更容易产生乱序,这样的数据不一致性问题非常大。
那么有没有一种架构,既能满足实时性的需求,又能满足离线计算的需求,同时还能减轻运营开发成本?解决Kappa架构的痛点呢?
3.3 实时数据仓库建设需求是否有一种技术,既能够保证数据高效的回溯能力,支持数据更新,又能够实现数据的流批读写,并且还能够实现分钟级别的数据接入。
这也是建设实时数据仓库的迫切需要,实际上需要对Kappa架构进行改进升级,以解决Kappa架构中遇到的问题,接下来我们会进一步探讨数据湖技术--Iceberg。
四 Flink+Iceberg构建实时数仓4.1 准实时数据仓库分析系统我们知道Iceberg支持读写分离,又支持并发读、增量读、合并小文件,而且还能做到秒级/分钟级的数据延迟。我们基于Iceberg这些优势,采用Flink+Iceberg的方式构建了流批一体化的实时数据仓库。
在数据仓库处理层,可以用 presto 进行一些简单的查询,因为 Iceberg 支持 Streaming read,所以在系统的中间层也可以直接接入 Flink,直接在中间层用 Flink 做一些批处理或者流式计算的任务,把中间结果做进一步计算后输出到下游。
4.2 采用Iceberg替代Kafka实时数仓的优劣势五 数据湖Apache Iceberg介绍5.1 Iceberg是什么官网对Iceberg的描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table.
Iceberg的官方定义是一种表格式,可以理解为是基于计算层(Spark、Flink)和存储层(ORC、Parquet、Avro)的中间介质层,用Flink或者Spark将数据写入Iceberg,然后通过Presto、Flink、Spark来读取这些表。
5.2 Iceberg的Table格式介绍Iceberg主要是为分析海量数据计算的,被定义为Table Format,Table Format介于计算层和存储层之间。
Table Format向下管理存储系统上的文件,向上为计算层提供接口。比如一张Hive表,在HDFS上会有Partition,存储格式,压缩格式和数据的HDFS目录等,这些信息都维护在元数据中,这些元数据被称为一种文件的组织形式。
Iceberg能够高效支撑上层的计算层访问磁盘上的文件。
5.3 Iceberg的功能总结Iceberg目前支持三种文件格式,Parquet、ORC、Avro,Iceberg的主要功能如下:
5.4 Iceberg的设计5.4.1 设计目标和HIVE模式类似,它也是一种开方的静态数据存储形式,和计算层使用的语言不同。具有强大的扩展性和可靠性:简单透明的使用方式,用户只需要关心写入数据的逻辑,Iceberg会自动识别所有元数据的变更。Iceberg也支持并发写。存储结构高可用:Iceberg有非常合理的Schema管理模式,具有多版本管理机制,支持版本回滚。5.4.2 详细设计自带ACID能力:保障每次写入后的数据都是一个完整的快照(snapshot),每个snapshot包含着一系列的文件列表,落地任务把数据直接写入Iceberg表中,不需要任务再做额外的success状态维护。Iceberg会根据分区字段自动处理延时到来的数据,把延时的数据及时的写入到正确的分区,因为有ACID的保障,延时数据写入过程中Iceberg表依然提供可靠的读取能力。
基于MVCC(Multi Version Concurrency Control)的机制,默认读取文件会从最新的的版本,每次写入都会产生一个新的snapshot,读写相互不干扰。
电脑基于多版本的机制可以可用轻松实现回滚和时间旅行的功能,读取或者回滚任意版本的snapshot数据。
5.4.3 组织架构下图是 Iceberg 整个文件的组织架构。从上往下看:
最上层是 snapshot 模块。snapshot 是用户可读取的基本数据单位,也就是说,每次读取一张表里面的所有数据,都是一个snapshot 里面的数据。中间层manifest。一个 snapshot 下面会有多个 manifest,如图 snapshot-0 有两个 manifest,而 snapshot-1 有三个 manifest,每个 manifest 下面会管理一个或多个 DataFiles 文件。数据层DataFiles。manifest 文件里面存放的就是数据的元信息,我们可以打开 manifest 文件,可以看到里面其实是一行行的 datafiles 文件路径。5.5 Iceberg的读写过程介绍5.5.1 Iceberg的读写如下图所示,虚线框(snapshot-1)表示正在进行写操作,但是还没有发生commit操作,这时候 snapshot-1 是不可读的,用户只能读取已经 电脑 commit 之后的 snapshot。同理, snapshot-2,snapshot-3表示已经可读。
可以支持并发读,例如可以同时读取S1、S2、S3的快照数据,同时,可以回溯到snapshot-2或者snapshot-3。在snapshot-4 commit完成之后,这时候snapshot-4已经变成实线,就可以读取数据了。
例如,现在current Snapshot 的指针移到S3,用户对一张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响前面的 snapshot 的读操作。
5.5.2 增量读取Iceberg的每个snapshot都包含前一个snapshot的所有数据,每次都相当于全量读取数据,对于整个链路来说,读取数据的代价是非常高的。
如果我们只想读取当前时刻的增量数据,就可以根据Iceberg中Snapshot的回溯机制来实现,仅读取Snapshot1到Snapshot2的增量数据,也就是下图中的紫色数据部分。
同理,S3也可以只读取红色部分的增量数据,也可以读取S1-S3的增量数据。
Iceberg支持读写分离,也就是说可以支持并发读和增量读。
电脑5.6 小文件问题5.6.1 实时小文件问题目前Flink社区现在已经重构了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我们采用的也是社区提供的FlinkIcebergSink,曲线框中的这块内容是 FlinkIcebergSink。
多个 IcebergStreamWriter 和一个 IcebergFileCommitter 的情况下,在上游的数据写到 IcebergStreamWriter 的时候,每个 writer 里面做的事情都是去写 datafiles 文件。
当每个 writer 写完自己当前这一批 datafiles 小文件的时候,就会发送消息给 IcebergFileCommitter,告诉它可以提交了。而 IcebergFileCommitter 收到信息的时,就一次性将 datafiles 的文件提交,进行一次 commit 操作。
commit 操作本身只是对一些原始信息的修改,让其从不可见变成可见。
5.6.2 实时合并小文件在实际的生产环境中,Flink 实时作业会一直在集群中运行,为了要保证数据的时效性,一般会把 Iceberg commit 操作的时间周期设成 30 秒或者是一分钟。当 Flink 作业跑一天时,如果是一分钟一次 commit,一天需要 1440 个 commit,如果 Flink 作业跑一个月commit 操作会更多。甚至 snapshot commit 的时间间隔越短,生成的 snapshot 的数量会越多。当流式作业运行后,就会生成大量的小文件。
Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。社区中小文件合并其实是通过 Spark 并行计算的,我们参考了社区Spark的实现方法,自己封装了使用Flink合并小文件的方法。
六 总结从上世纪90年代的传统DB型数据仓库演进到大数据时代的Lambda架构数据仓库,Kappa架构再到数据湖的一路演进,数据架构的湖仓一体,数据分析的离线实时一体化。虽然数据湖技术现在还不是特别成熟,但是未来可期。
欢迎大家评论、转发。
电脑