ramy  2021-09-16 17:26:14  赢咖4平台 |   查看评论   

如果size

通过组合可以得出四种基本窗口

time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

time-sliding-window  有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

3) Window Reduce

WindowedStream → DataStream:给window赋一个reduce功能的函数,并返回一个聚合的结果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowReduce {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
   // 引入时间窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行聚合操作
   val streamReduce = streamWindow.reduce(
     (item1, item2) => (item1._1, item1._2 + item2._2)
   )
   // 将聚合数据写入文件
   streamReduce.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
4) Window Apply

apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。

用法

实现一个 WindowFunction 类指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型]

示例:使用apply方法来实现单词统计

步骤:

获取流处理运行环境构建socket流数据源,并指定IP地址和端口号对接收到的数据转换成单词元组使用 keyBy 进行分流(分组)使用 timeWinodw 指定窗口的长度(每3秒计算一次)实现一个WindowFunction匿名内部类apply方法中实现聚合计算使用Collector.collect收集数据

核心代码如下:

   //1. 获取流处理运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //2. 构建socket流数据源,并指定IP地址和端口号
   val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))
   //3. 对接收到的数据转换成单词元组
   val wordDataStream = textDataStream.map(_->1)
   //4. 使用 keyBy 进行分流(分组)
   val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
   //5. 使用 timeWinodw 指定窗口的长度(每3秒计算一次)
   val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
   //6. 实现一个WindowFunction匿名内部类
   val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
     //在apply方法中实现数据的聚合
     override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
       println("hello world")
       val tuple = input.reduce((t1, t2) => {
         (t1._1, t1._2 + t2._2)
       })
       //将要返回的数据收集起来,发送回去
       out.collect(tuple)
     }
   })
   reduceDatStream.print()
   env.execute()
5) Window Fold

WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowFold {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999,'',3)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
   // 引入滚动窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行fold操作
   val streamFold = streamWindow.fold(100){
     (begin, item) =>
       begin + item._2
   }
   // 将聚合数据写入文件
   streamFold.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
6) Aggregation on Window

WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
object StreamWindowAggregation {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
   // 引入滚动窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行聚合操作
   val streamMax = streamWindow.max(1)
   // 将聚合数据写入文件
   streamMax.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
4. EventTime与Window1) EventTime的引入与现实世界中的时间是不一致的,在flink中被划分为事件时间,提取时间,处理时间三种。如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准。

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2) Watermark

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,所以 Flink 最初设计的时候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);

Flink

如上图所示,就出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 Window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 Window 去进行计算了,这个特别的机制,就是 Watermark。

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。烹饪1-375

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。

Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 EventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。

有序流的Watermarker如下图所示:(Watermark设置为0)

Flink

有序数据的Watermark

乱序流的Watermarker如下图所示:(Watermark设置为2)

Flink

无序数据的Watermark

当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。

3) Flink对于迟到数据的处理

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,于延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据。

设置允许延迟的时间是通过 allowedLateness(lateness: Time) 设置

保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保存

获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取

具体的用法如下:

allowedLateness(lateness: Time)

def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
 javaStream.allowedLateness(lateness)
 this
}

该方法传入一个Time值,设置允许数据迟到的时间,这个时间和 WaterMark 中的时间概念不同。再来回顾一下:

WaterMark=数据的事件时间-允许乱序时间值

随着新数据的到来,waterMark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接收到尽可能多的乱序数据。

那这里的Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明

注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常。

sideOutputLateData(outputTag: OutputTag[T])

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
 javaStream.sideOutputLateData(outputTag)
 this
}

该方法是将迟来的数据保存至给定的outputTag参数,而OutputTag则是用来标记延迟数据的一个对象。

DataStream.getSideOutput(tag: OutputTag[X])

通过window等操作返回的DataStream调用该方法,传入标记延迟数据的对象来获取延迟的数据。

对延迟数据的理解

延迟数据是指:

在当前窗口【假设窗口范围为10-15】已经计算之后,又来了一个属于该窗口的数据【假设事件时间为13】,这时候仍会触发 Window 操作,这种数据就称为延迟数据。

那么问题来了,延迟时间怎么计算呢?

假设窗口范围为10-15,延迟时间为2s,则只要 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即使新来的数据的 Event Time 属于这个窗口时间内 。
 

4) Flink 关联 Hive 分区表

Flink 1.12 支持了 Hive 最新的分区作为时态表的功能,可以通过 SQL 的方式直接关联 Hive 分区表的最新分区,并且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地做维表数据的全量替换。通过这种方式,用户无需编写 DataStream 程序即可完成 Kafka 流实时关联最新的 Hive 分区实现数据打宽。

具体用法:

在 Sql Client 中注册 HiveCatalog:

vim conf/sql-client-defaults.yaml
catalogs:
 - name: hive_catalog
   type: hive
   hive-conf-dir: /disk0/soft/hive-conf/ #该目录需要包hive-site.xml文件

创建 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  

 

除特别注明外,本站所有文章均为 赢咖4注册 原创,转载请注明出处来自一文详解Flink知识体系_烹饪1-375

留言与评论(共有 0 条评论)
   
验证码:
[lianlun]1[/lianlun]