最新公告
  • 欢迎光临数据科学与编程,我们是数据学科学兴趣交流小组立即加入我们
  • 基于Flink构建一体数仓的技术点

    基于Flink构建流批一体的实时数仓是目前数据仓库领域比较火的实践方案。随着Flink的不断迭代,其提供的一系列技术特性使得用户构建流批一体的应用变得越来越方便。本文将以Flink1.12为例,一一介绍这些特性的基本使用方式,主要包括以下内容:

    • Flink集成Hive
    • Hive Catalog与Hive Dialect
    • Flink读写Hive
    • Flink upsert-kafka连接器
    • Flink CDC的connector

    尖叫提示:本文内容较长,建议收藏

    Flink集成Hive

    使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,本文将以最新的Flink1.12版本为例,阐述Flink集成Hive的简单步骤,以下是全文,希望对你有所帮助。

    Flink集成Hive的基本方式

    Flink 与 Hive 的集成主要体现在以下两个方面:

    • 持久化元数据

    Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。

    • 利用 Flink 来读写 Hive 的表。

    Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

    HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

    Flink集成Hive的步骤

    Flink支持的Hive版本

    大版本 V1 V2 V3 V4 V5 V6 V7
    1.0 1.0.0 1.0.1
    1.1 1.1.0 1.1.1
    1.2 1.2.0 1.2.1 1.2.2
    2.0 2.0.0 2.0.1
    2.1 2.1.0 2.1.1
    2.2 2.2.0
    2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
    3.1 3.1.0 3.1.1 3.1.2

    值得注意的是,对于不同的Hive版本,可能在功能方面有所差异,这些差异取决于你使用的Hive版本,而不取决于Flink,一些版本的功能差异如下:

    • Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持。
    • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
    • 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持。
    • DATE列统计信息,在使用 Hive-1.2.0 及更高版时支持。
    • 使用 Hive-2.0.x 版本时不支持写入 ORC 表。

    依赖项

    本文以Flink1.12为例,集成的Hive版本为Hive2.3.4。集成Hive需要额外添加一些依赖jar包,并将其放置在Flink安装目录下的lib文件夹下,这样我们才能通过 Table API 或 SQL Client 与 Hive 进行交互。

    另外,Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

    Flink官网提供了两种方式添加Hive的依赖项。第一种是使用 Flink 提供的 Hive Jar包(根据使用的 Metastore 的版本来选择对应的 Hive jar),建议优先使用Flink提供的Hive jar包,这种方式比较简单方便。本文使用的就是此种方式。当然,如果你使用的Hive版本与Flink提供的Hive jar包兼容的版本不一致,你可以选择第二种方式,即别添加每个所需的 jar 包。

    下面列举了可用的jar包及其适用的Hive版本,我们可以根据使用的Hive版本,下载对应的jar包即可。比如本文使用的Hive版本为Hive2.3.4,所以只需要下载flink-sql-connector-hive-2.3.6即可,并将其放置在Flink安装目录的lib文件夹下。

    Metastore version Maven dependency SQL Client JAR
    1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 Download
    2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 Download
    2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 Download
    3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 Download

    上面列举的jar包,是我们在使用Flink SQL Cli所需要的jar包,除此之外,根据不同的Hive版本,还需要添加如下jar包。以Hive2.3.4为例,除了上面的一个jar包之外,还需要添加下面两个jar包:

    flink-connector-hive_2.11-1.12.0.jarhive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在于Hive安装路径下的lib文件夹。flink-connector-hive_2.11-1.12.0.jar的下载地址为:

    NOTE:black_nib::Flink1.12集成Hive只需要添加如下三个jar包,以Hive2.3.4为例,分别为:

    flink-sql-connector-hive-2.3.6

    flink-connector-hive_2.11-1.12.0.jar

    hive-exec-2.3.4.jar

    Flink SQL Cli集成Hive

    将上面的三个jar包添加至Flink的lib目录下之后,就可以使用Flink操作Hive的数据表了。以FlinkSQL Cli为例:

    配置sql-client-defaults.yaml

    该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

    除了上面的一些配置参数,Flink还提供了下面的一些其他配置参数:

    参数 必选 默认值 类型 描述
    type (无) String Catalog 的类型。创建 HiveCatalog 时,该参数必须设置为'hive'
    name (无) String Catalog 的名字。仅在使用 YAML file 时需要指定。
    hive-conf-dir (无) String 指向包含 hive-site.xml 目录的 URI。该 URI 必须是 Hadoop 文件系统所支持的类型。如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。
    default-database default String 当一个catalog被设为当前catalog时,所使用的默认当前database。
    hive-version (无) String HiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。
    hadoop-conf-dir (无) String Hadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。

    操作Hive中的表

     

    向Hive表users中插入一条数据:

    再次使用Hive客户端去查询该表的数据,会发现写入了一条数据。

    接下来,我们再在FlinkSQL Cli中创建一张kafka的数据源表:

    NOTE:black_flag::在Flink中创建一张表,会把该表的元数据信息持久化到Hive的metastore中,我们可以在Hive的metastore中查看该表的元数据信息

    使用代码连接到 Hive

    代码

    Hive Catalog与Hive Dialect

    什么是Hive Catalog

    我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。

    Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

    如何使用Hive Catalog

    HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。

    HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。

    普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。

    对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false

    尖叫提示:

    由于依赖Hive Metastore,所以必须开启Hive MetaStore服务

    代码中使用Hive Catalog

    Flink SQLCli中使用Hive Catalog

    对于Hive兼容的表,需要注意数据类型,具体的数据类型对应关系以及注意点如下

    Flink 数据类型 Hive 数据类型
    CHAR(p) CHAR(p)
    VARCHAR(p) VARCHAR(p)
    STRING STRING
    BOOLEAN BOOLEAN
    TINYINT TINYINT
    SMALLINT SMALLINT
    INT INT
    BIGINT LONG
    FLOAT FLOAT
    DOUBLE DOUBLE
    DECIMAL(p, s) DECIMAL(p, s)
    DATE DATE
    TIMESTAMP(9) TIMESTAMP
    BYTES BINARY
    ARRAY LIST
    MAP<K, V> MAP<K, V>
    ROW STRUCT

    注意

    • Hive CHAR(p) 类型的最大长度为255
    • Hive VARCHAR(p)类型的最大长度为65535
    • Hive MAP类型的key仅支持基本类型,而Flink’s MAP 类型的key执行任意类型
    • Hive不支持联合数据类型,比如STRUCT
    • Hive’s TIMESTAMP 的精度是 9 , Hive UDFs函数只能处理 precision <= 9的 TIMESTAMP
    • Hive 不支持 Flink提供的 TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE, 及MULTISET类型
    • FlinkINTERVAL 类型与 Hive INTERVAL 类型不一样

    上面介绍了普通表和Hive兼容表,那么我们该如何使用Hive的语法进行建表呢?这个时候就需要使用Hive Dialect

    什么是Hive Dialect

    从Flink1.11.0开始,只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。

    Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive

    如何使用Hive Dialect

    在SQL Cli中使用Hive dialect

    使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。

    尖叫提示:

    一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错

     

    • Hive dialect只能用于操作Hive表,不能用于普通表。Hive方言应与HiveCatalog一起使用。
    • 虽然所有Hive版本都支持相同的语法,但是是否有特定功能仍然取决于使用的Hive版本。例如,仅在Hive-2.4.0或更高版本中支持更新数据库位置。
    • Hive和Calcite具有不同的保留关键字。例如,default在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在使用Hive dialect时,必须使用反引号()引用此类关键字,才能将其用作标识符。
    • 在Hive中不能查询在Flink中创建的视图。

    当然,一旦开启了Hive dialect,我们就可以按照Hive的操作方式在Flink中去处理Hive的数据了,具体的操作与Hive一致,本文不再赘述。

    Flink读写Hive

    Flink写入Hive表

    Flink支持以**批处理(Batch)和流处理(Streaming)**的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。批处理的方式写入支持append模式和overwrite模式

    批处理模式写入

    流处理模式写入

    关于Hive表的一些属性解释:

    • partition.time-extractor.timestamp-pattern

      • 默认值:(none)
      • 解释:分区时间抽取器,与 DDL 中的分区字段保持一致,如果是按天分区,则可以是**year-day ,如果是按天时进行分区,则该属性值为:dt $hour:00:00;
    • sink.partition-commit.trigger

      • process-time:不需要时间提取器和水位线,当当前时间大于分区创建时间 + sink.partition-commit.delay 中定义的时间,提交分区;
      • partition-time:需要 Source 表中定义 watermark,当 watermark > 提取到的分区时间 +sink.partition-commit.delay 中定义的时间,提交分区;
      • 默认值:process-time
      • 解释:分区触发器类型,可选 process-time 或partition-time
    • sink.partition-commit.delay

      • 默认值:0S
      • 解释:分区提交的延时时间,如果是按天分区,则该属性的值为:1d,如果是按小时分区,则该属性值为1h;
    • sink.partition-commit.policy.kind

      • metastore:添加分区的元数据信息,仅Hive表支持该值配置
      • success-file:在表的存储路径下添加一个_SUCCESS文件
      • 默认值:(none)

      • 解释:提交分区的策略,用于通知下游的应用该分区已经完成了写入,也就是说该分区的数据可以被访问读取。可选的值如下:

        可以同时配置上面的两个值,比如metastore,success-file

    同时查看Hive表的分区数据:

    尖叫提示:

    1.Flink读取Hive表默认使用的是batch模式,如果要使用流式读取Hive表,需要而外指定一些参数,见下文。

    2.只有在完成 Checkpoint 之后,文件才会从 In-progress 状态变成 Finish 状态,同时生成_SUCCESS文件,所以,Flink流式写入Hive表需要开启并配置 Checkpoint。对于Flink SQL Client而言,需要在flink-conf.yaml中开启CheckPoint,配置内容为:

    state.backend: filesystem execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION execution.checkpointing.interval: 60s execution.checkpointing.mode: EXACTLY_ONCE state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints

    Flink读取Hive表

    Flink支持以**批处理(Batch)和流处理(Streaming)**的方式读取Hive中的表。批处理的方式与Hive的本身查询类似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,并且会增量地提取新的数据。默认情况下,Flink是以批处理的方式读取Hive表

    关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。

    Flink读取Hive表可以配置一下参数:

    • streaming-source.enable

      • 默认值:false
      • 解释:是否开启流式读取 Hive 表,默认不开启。
    • streaming-source.partition.include

      • 默认值:all
      • 解释:配置读取Hive的分区,包括两种方式:all和latest。all意味着读取所有分区的数据,latest表示只读取最新的分区数据。值得注意的是,latest方式只能用于开启了流式读取Hive表,并用于维表JOIN的场景。
    • streaming-source.monitor-interval

      • 默认值:None
      • 解释:持续监控Hive表分区或者文件的时间间隔。值得注意的是,当以流的方式读取Hive表时,该参数的默认值是1m,即1分钟。当temporal join时,默认的值是60m,即1小时。另外,该参数配置不宜过短 ,最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。
    • streaming-source.partition-order

      • 默认值:partition-name
      • 解释:streaming source的分区顺序。默认的是partition-name,表示使用默认分区名称顺序加载最新分区,也是推荐使用的方式。除此之外还有两种方式,分别为:create-time和partition-time。其中create-time表示使用分区文件创建时间顺序。partition-time表示使用分区时间顺序。指的注意的是,对于非分区表,该参数的默认值为:create-time
    • streaming-source.consume-start-offset

      • 默认值:None
      • 解释:流式读取Hive表的起始偏移量。
    • partition.time-extractor.kind
      • 默认值:default
      • 分区时间提取器类型。用于从分区中提取时间,支持default和自定义。如果使用default,则需要通过参数partition.time-extractor.timestamp-pattern配置时间戳提取的正则表达式。

         

    Hive维表JOIN

    Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。

    Flink支持的是processing-time的temporal join,也就是说总是与最新版本的时态表进行JOIN。另外,Flink既支持非分区表的temporal join,又支持分区表的temporal join。对于分区表而言,Flink会监听Hive表的最新分区数据。值得注意的是,Flink尚不支持 event-time temporal join。

    Temporal Join最新分区

    对于一张随着时间变化的Hive分区表,Flink可以读取该表的数据作为一个无界流。如果Hive分区表的每个分区都包含全量的数据,那么每个分区将做为一个时态表的版本数据,即将最新的分区数据作为一个全量维表数据。值得注意的是,该功能特点仅支持Flink的STREAMING模式。

    Temporal Join最新表

    对于Hive的非分区表,当使用temporal join时,整个Hive表会被缓存到Slot内存中,然后根据流中的数据对应的key与其进行匹配。使用最新的Hive表进行temporal join不需要进行额外的配置,我们只需要配置一个Hive表缓存的TTL时间,该时间的作用是:当缓存过期时,就会重新扫描Hive表并加载最新的数据。

    • lookup.join.cache.ttl

      尖叫提示:

      当使用此种方式时,Hive表必须是有界的lookup表,即非Streaming Source的时态表,换句话说,该表的属性streaming-source.enable = false

      如果要使用Streaming Source的时态表,记得配置streaming-source.monitor-interval的值,即数据更新的时间间隔。

      • 默认值:60min
      • 解释:表示缓存时间。由于 Hive 维表会把维表所有数据缓存在 TM 的内存中,当维表数据量很大时,很容易造成 OOM。当然TTL的时间也不能太短,因为会频繁地加载数据,从而影响性能。

     

    尖叫提示:

    1.每一个子任务都需要缓存一份维表的全量数据,一定要确保TM的task Slot 大小能够容纳维表的数据量;

    2.推荐将streaming-source.monitor-interval和lookup.join.cache.ttl的值设为一个较大的数,因为频繁的更新和加载数据会影响性能。

    3.当缓存的维表数据需要重新刷新时,目前的做法是将整个表进行加载,因此不能够将新数据与旧数据区分开来。

     

    Hive维表JOIN示例

    假设维表的数据是通过批处理的方式(比如每天)装载至Hive中,而Kafka中的事实流数据需要与该维表进行JOIN,从而构建一个宽表数据,这个时候就可以使用Hive的维表JOIN。

    Flink upsert-kafka连接器

    Upsert Kafka connector简介

    Upsert Kafka Connector允许用户以upsert的方式从Kafka主题读取数据或将数据写入Kafka主题。

    当作为数据源时,upsert-kafka Connector会生产一个changelog流,其中每条数据记录都表示一个更新或删除事件。更准确地说,如果不存在对应的key,则视为INSERT操作。如果已经存在了相对应的key,则该key对应的value值为最后一次更新的值。

    用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。

    当作为数据汇时,upsert-kafka Connector会消费一个changelog流。它将INSERT / UPDATE_AFTER数据作为正常的Kafka消息值写入(即INSERT和UPDATE操作,都会进行正常写入,如果是更新,则同一个key会存储多条数据,但在读取该表数据时,只保留最后一次更新的值),并将 DELETE 数据以 value 为空的 Kafka 消息写入(key被打上墓碑标记,表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中

    依赖

    如果使用SQL Client,需要下载flink-sql-connector-kafka_2.11-1.12.0.jar,并将其放置在Flink安装目录的lib文件夹下。

    使用方式

    尖叫提示:

    要使用 upsert-kafka connector,必须在创建表时使用PRIMARY KEY定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。

    upsert-kafka connector参数

    • connector

    必选。指定要使用的连接器,Upsert Kafka 连接器使用:'upsert-kafka'

    • topic

    必选。用于读取和写入的 Kafka topic 名称。

    • properties.bootstrap.servers

    必选。以逗号分隔的 Kafka brokers 列表。

    • key.format

    必选。用于对 Kafka 消息中 key 部分序列化和反序列化的格式。key 字段由 PRIMARY KEY 语法指定。支持的格式包括 'csv''json''avro'

    • value.format

    必选。用于对 Kafka 消息中 value 部分序列化和反序列化的格式。支持的格式包括 'csv''json''avro'

    • *properties. **

    可选。该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。但是,某些选项,例如'key.deserializer''value.deserializer' 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

    • value.fields-include

    可选,默认为ALL。控制key字段是否出现在 value 中。当取ALL时,表示消息的 value 部分将包含 schema 中所有的字段,包括定义为主键的字段。当取EXCEPT_KEY时,表示记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。

    • key.fields-prefix

    可选。为了避免与value字段命名冲突,为key字段添加一个自定义前缀。默认前缀为空。一旦指定了key字段的前缀,必须在DDL中指明前缀的名称,但是在构建key的序列化数据类型时,将移除该前缀。见下面的示例。在需要注意的是:使用该配置属性,value.fields-include的值必须为EXCEPT_KEY

     

    尖叫提示:

    如果指定了key字段前缀,但在DDL中并没有添加该前缀字符串,那么在向该表写入数时,会抛出下面异常:

    [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: All fields in ‘key.fields’ must be prefixed with ‘qwe’ when option ‘key.fields-prefix’ is set but field ‘do_date’ is not prefixed.

    • sink.parallelism

    可选。定义 upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。

    其他注意事项

    Key和Value的序列化格式

    关于Key、value的序列化可以参考Kafka connector。值得注意的是,必须指定Key和Value的序列化格式,其中Key是通过PRIMARY KEY指定的。

    Primary Key约束

    Upsert Kafka 工作在 upsert 模式(FLIP-149)下。当我们创建表时,需要在 DDL 中定义主键。具有相同key的数据,会存在相同的分区中。在 changlog source 上定义主键意味着在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在 Kafka 消息的 key 中。

    一致性保障

    默认情况下,如果启用 checkpoint,Upsert Kafka sink 会保证至少一次将数据插入 Kafka topic。

    这意味着,Flink 可以将具有相同 key 的重复记录写入 Kafka topic。但由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。因此,upsert-kafka 连接器可以像 HBase sink 一样实现幂等写入。

    分区水位线

    Flink 支持根据 Upsert Kafka 的 每个分区的数据特性发送相应的 watermark。当使用这个特性的时候,watermark 是在 Kafka consumer 内部生成的。合并每个分区生成的 watermark 的方式和 streaming shuffle 的方式是一致的(单个分区的输入取最大值,多个分区的输入取最小值)。数据源产生的 watermark 是取决于该 consumer 负责的所有分区中当前最小的 watermark。如果该 consumer 负责的部分分区是空闲的,那么整体的 watermark 并不会前进。在这种情况下,可以通过设置合适的 table.exec.source.idle-timeout 来缓解这个问题。

    数据类型

    Upsert Kafka 用字节bytes存储消息的 key 和 value,因此没有 schema 或数据类型。消息按格式进行序列化和反序列化,例如:csv、json、avro。不同的序列化格式所提供的数据类型有所不同,因此需要根据使用的序列化格式进行确定表字段的数据类型是否与该序列化类型提供的数据类型兼容。

    使用案例

    本文以实时地统计网页PV和UV的总量为例,介绍upsert-kafka基本使用方式:

    • Kafka 数据源

    用户的ippv信息,一个用户在一天内可以有很多次pv

    • 生产用户访问数据到kafka,向kafka中的user_ippv插入数据:

    可以看出:每分钟的pv、uv只显示一条数据,即代表着截止到当前时间点的pv和uv

    查看Kafka中result_total_pvuv_min主题的数据,如下:

    可以看出:针对每一条访问数据,触发计算了一次PV、UV,每一条数据都是截止到当前时间的累计PV和UV。

    尖叫提示:

    默认情况下,如果在启用了检查点的情况下执行查询,Upsert Kafka接收器会将具有至少一次保证的数据提取到Kafka主题中。

    这意味着,Flink可能会将具有相同键的重复记录写入Kafka主题。但是,由于连接器在upsert模式下工作,因此作为源读回时,同一键上的最后一条记录将生效。因此,upsert-kafka连接器就像HBase接收器一样实现幂等写入。

    Flink CDC的connector

    简介

    Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。

    特点

    • 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义

    • 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。

    • 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

    使用场景

    • 数据库之间的增量数据同步
    • 审计日志
    • 数据库之上的实时物化视图
    • 基于CDC的维表join

    Flink提供的 table format

    Flink提供了一系列可以用于table connector的table format,具体如下:

    Formats Supported Connectors
    CSV Apache Kafka, Filesystem
    JSON Apache Kafka, Filesystem, Elasticsearch
    Apache Avro Apache Kafka, Filesystem
    Debezium CDC Apache Kafka
    Canal CDC Apache Kafka
    Apache Parquet Filesystem
    Apache ORC Filesystem

    使用过程中的注意点

    使用MySQL CDC的注意点

    如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

    使用canal-json的注意点

    如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误:

    mysql-cdc

    使用changelog-json的注意点

    如果要使用Kafka的changelog-json Format,对于程序而言,

    如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

    mysql-cdc的操作实践

    创建MySQL数据源表

    Flink SQL Cli创建CDC数据源

     

    在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert

     

    查询结果如下:

    执行JOIN操作:

    canal-json的操作实践

    关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,

    查询结果如下:

    changelog-json的操作实践

    创建MySQL数据源

    参见上面的order_info

    查询表看一下结果:

    当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据:

    再看kafka中的数据:

    总结

    本文主要介绍了基于FlinK构建实时数仓的技术点,并对其使用方式进行了详细描述.

    本站上原创文章未经作者许可,不得用于商业用途,仅做学习交流使用,本站免责声明。转载请注明出处,否则保留追究法律责任的权利。《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权
    数据科学与编程 » 基于Flink构建一体数仓的技术点

    发表评论

    • 52会员总数(位)
    • 320资源总数(个)
    • 25本周发布(个)
    • 3 今日发布(个)
    • 333稳定运行(天)

    提供最优质的博文资源集合

    立即阅览 了解详情