ramy  2021-09-16 17:26:14  赢咖4平台 |   查看评论   

   master Row

Flink 事实表与 Hive 最新分区数据关联

dim_extend_shop_info 是 Hive 中已存在的表,所以我们用 table hint 动态地开启维表参数。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,  
    ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
   from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
      JOIN hive_catalog.flink_db.dim_extend_shop_info  
 + OPTIONS('streaming-source.enable'='true',  
    'streaming-source.partition.include' = 'latest',  
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name')
   FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表  
   ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
   where groupID in (202042)) t  where t.rn = 1

参数解释:

streaming-source.enable 开启流式读取 Hive 数据。

streaming-source.partition.include 有以下两个值:

latest 属性: 只读取最新分区数据。all: 读取全量分区数据 ,默认值为 all,表示读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能直接读取最新分区数据。

streaming-source.monitor-interval 监听新分区生成的时间、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查询 metastore,高频的查可能会对metastore 产生过大的压力。需要注意的是,1.12.1 放开了这个限制,但仍建议按照实际业务不要配个太短的 interval。

streaming-source.partition-order 分区策略,主要有以下 3 种,其中最为推荐的是 partition-name:

partition-name 使用默认分区名称顺序加载最新分区create-time 使用分区文件创建时间顺序partition-time 使用分区时间顺序六、Flink 状态管理

我们前面写的 wordcount 的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

因此可以说flink因为引入了state和checkpoint所以才支持的exactly once

首先区分一下两个概念:

state:

state一般指一个具体的task/operator的状态:

state数据默认保存在java的堆内存中,TaskManage节点的内存中。

operator表示一些算子在运行的过程中会产生的一些中间结果。

checkpoint:

checkpoint可以理解为checkpoint是把state数据定时持久化存储了,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。

注意:task(subTask)是Flink中执行的基本单位。operator指算子(transformation)

State可以被记录,在失败的情况下数据还可以恢复。

Flink中有两种基本类型的State:

Keyed State

Operator State

Keyed State和Operator State,可以以两种形式存在:

原始状态(raw state)

托管状态(managed state)

托管状态是由Flink框架管理的状态。

我们说operator算子保存了数据的中间结果,中间结果保存在什么类型中,如果我们这里是托管状态,则由flink框架自行管理

原始状态由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

1. State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。

保存state的数据结构:

ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

MapState

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄。

1. ValueState

使用ValueState保存中间结果对下面数据进行分组求和。

开发步骤:

1. 获取流处理执行环境
 2. 加载数据源
 3. 数据分组
 4. 数据转换,定义ValueState,保存中间结果
 5. 数据打印
 6. 触发执行

ValueState:测试数据源:

List(
  (1L, 4L),
  (2L, 3L),
  (3L, 1L),
  (1L, 2L),
  (3L, 2L),
  (1L, 2L),
  (2L, 2L),
  (2L, 9L)
)

示例代码:

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object TestKeyedState {
 class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
   *
    * ValueState状态句柄. 第一个值为count,第二个值为sum。
   
   private var sum: ValueState[(Long, Long)] = _
   override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
     // 获取当前状态值
     val tmpCurrentSum: (Long, Long) = sum.value
     // 状态默认值
     val currentSum = if (tmpCurrentSum != null) {
       tmpCurrentSum
     } else {
       (0L, 0L)
     }
     // 更新
     val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
     // 更新状态值
     sum.update(newSum)
     // 如果count >=3 清空状态值,重新计算
     if (newSum._1 >= 3) {
       out.collect((input._1, newSum._2 / newSum._1))
       sum.clear()
     }
   }
   override def open(parameters: Configuration): Unit = {
     sum = getRuntimeContext.getState(
       new ValueStateDescriptor[(Long, Long)]("average", // 状态名称
         TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 状态类型
     )
   }
 }  
 def main(args: Array[String]): Unit = {
   //初始化执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //构建数据源
   val inputStream: DataStream[(Long, Long)] = env.fromCollection(
     List(
       (1L, 4L),
       (2L, 3L),
       (3L, 1L),
       (1L, 2L),
       (3L, 2L),
       (1L, 2L),
       (2L, 2L),
       (2L, 9L))
   )
   //执行数据处理
   inputStream.keyBy(0)
     .flatMap(new CountWithKeyedState)
     .setParallelism(1)
     .print
   //运行任务
   env.execute
 }
}  
2. MapState

使用MapState保存中间结果对下面数据进行分组求和:

1. 获取流处理执行环境
 2. 加载数据源
 3. 数据分组
 4. 数据转换,定义MapState,保存中间结果
 5. 数据打印
 6. 触发执行

MapState:测试数据源:

List(
  ("java", 1),
  ("python", 3),
  ("java", 2),
  ("scala", 2),
  ("python", 1),
  ("java", 1),
  ("scala", 2)
)  

示例代码:

object MapState {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   *
     * 使用MapState保存中间结果对下面数据进行分组求和
     * 1.获取流处理执行环境
     * 2.加载数据源
     * 3.数据分组
     * 4.数据转换,定义MapState,保存中间结果
     * 5.数据打印
     * 6.触发执行
     
   val source: DataStream[(String, Int)] = env.fromCollection(List(
     ("java", 1),
     ("python", 3),
     ("java", 2),
     ("scala", 2),
     ("python", 1),
     ("java", 1),
     ("scala", 2)))
 
   source.keyBy(0)
     .map(new RichMapFunction[(String, Int), (String, Int)] {
       var mste: MapState[String, Int] = _
       override def open(parameters: Configuration): Unit = {
         val msState = new MapStateDescriptor[String, Int]("ms",
           TypeInformation.of(new TypeHint[(String)] {}),
           TypeInformation.of(new TypeHint[(Int)] {}))
         mste = getRuntimeContext.getMapState(msState)
       }
       override def map(value: (String, Int)): (String, Int) = {
         val i: Int = mste.get(value._1)
         mste.put(value._1, value._2 + i)
         (value._1, value._2 + i)
       }
     }).print()
   env.execute()
 }
}  
2. State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state。

保存state的数据结构:

ListState

举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

步骤:

获取执行环境

设置检查点机制:路径,重启策略

自定义数据源

需要继承并行数据源和CheckpointedFunction设置listState,通过上下文对象context获取数据处理,保留offset制作快照

数据打印

触发执行

示例代码:

import java.util
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object ListOperate {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   env.enableCheckpointing(5000)
   env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //重启策略
   env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
   //模拟kakfa偏移量
   env.addSource(new MyRichParrelSourceFun)
     .print()

 

除特别注明外,本站所有文章均为 赢咖4注册 原创,转载请注明出处来自一文详解Flink知识体系_烹饪1-375

留言与评论(共有 0 条评论)
   
验证码:
[lianlun]1[/lianlun]