訂閱
糾錯
加入自媒體

一文詳解Flink知識體系

2021-09-13 09:58
園陌
關注

4) Flink 關聯(lián) Hive 分區(qū)表

Flink 1.12 支持了 Hive 最新的分區(qū)作為時態(tài)表的功能,可以通過 SQL 的方式直接關聯(lián) Hive 分區(qū)表的最新分區(qū),并且會自動監(jiān)聽最新的 Hive 分區(qū),當監(jiān)控到新的分區(qū)后,會自動地做維表數(shù)據(jù)的全量替換。通過這種方式,用戶無需編寫 DataStream 程序即可完成 Kafka 流實時關聯(lián)最新的 Hive 分區(qū)實現(xiàn)數(shù)據(jù)打寬。

具體用法:

在 Sql Client 中注冊 HiveCatalog:

vim conf/sql-client-defaults.yaml
catalogs:
 - name: hive_catalog
   type: hive
   hive-conf-dir: /disk0/soft/hive-conf/ #該目錄需要包hive-site.xml文件

創(chuàng)建 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
   master Row

Flink 事實表與 Hive 最新分區(qū)數(shù)據(jù)關聯(lián)

dim_extend_shop_info 是 Hive 中已存在的表,所以我們用 table hint 動態(tài)地開啟維表參數(shù)。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,  
    ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
   from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
      JOIN hive_catalog.flink_db.dim_extend_shop_info  
 + OPTIONS('streaming-source.enable'='true',  
    'streaming-source.partition.include' = 'latest',  
    'streaming-source.monitor-interval' = '1 h',
    'streaming-source.partition-order' = 'partition-name')
   FOR SYSTEM_TIME AS OF t1.proctime AS t2 --時態(tài)表  
   ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
   where groupID in (202042)) t  where t.rn = 1

參數(shù)解釋:

streaming-source.enable 開啟流式讀取 Hive 數(shù)據(jù)。

streaming-source.partition.include 有以下兩個值:

latest 屬性: 只讀取最新分區(qū)數(shù)據(jù)。all: 讀取全量分區(qū)數(shù)據(jù) ,默認值為 all,表示讀所有分區(qū),latest 只能用在 temporal join 中,用于讀取最新分區(qū)作為維表,不能直接讀取最新分區(qū)數(shù)據(jù)。

streaming-source.monitor-interval 監(jiān)聽新分區(qū)生成的時間、不宜過短 、最短是1 個小時,因為目前的實現(xiàn)是每個 task 都會查詢 metastore,高頻的查可能會對metastore 產生過大的壓力。需要注意的是,1.12.1 放開了這個限制,但仍建議按照實際業(yè)務不要配個太短的 interval。

streaming-source.partition-order 分區(qū)策略,主要有以下 3 種,其中最為推薦的是 partition-name:

partition-name 使用默認分區(qū)名稱順序加載最新分區(qū)create-time 使用分區(qū)文件創(chuàng)建時間順序partition-time 使用分區(qū)時間順序六、Flink 狀態(tài)管理

我們前面寫的 wordcount 的例子,沒有包含狀態(tài)管理。如果一個task在處理過程中掛掉了,那么它在內存中的狀態(tài)都會丟失,所有的數(shù)據(jù)都需要重新計算。從容錯和消息處理的語義上(at least once, exactly once),Flink引入了state和checkpoint。

因此可以說flink因為引入了state和checkpoint所以才支持的exactly once

首先區(qū)分一下兩個概念:

state:

state一般指一個具體的task/operator的狀態(tài):

state數(shù)據(jù)默認保存在java的堆內存中,TaskManage節(jié)點的內存中。

operator表示一些算子在運行的過程中會產生的一些中間結果。

checkpoint:

checkpoint可以理解為checkpoint是把state數(shù)據(jù)定時持久化存儲了,則表示了一個Flink Job在一個特定時刻的一份全局狀態(tài)快照,即包含了所有task/operator的狀態(tài)。

注意:task(subTask)是Flink中執(zhí)行的基本單位。operator指算子(transformation)

State可以被記錄,在失敗的情況下數(shù)據(jù)還可以恢復。

Flink中有兩種基本類型的State:

Keyed State

Operator State

Keyed State和Operator State,可以以兩種形式存在:

原始狀態(tài)(raw state)

托管狀態(tài)(managed state)

托管狀態(tài)是由Flink框架管理的狀態(tài)。

我們說operator算子保存了數(shù)據(jù)的中間結果,中間結果保存在什么類型中,如果我們這里是托管狀態(tài),則由flink框架自行管理

原始狀態(tài)由用戶自行管理狀態(tài)具體的數(shù)據(jù)結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態(tài)內容,對其內部數(shù)據(jù)結構一無所知。

通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當實現(xiàn)一個用戶自定義的operator時,會使用到原始狀態(tài)。

1. State-Keyed State

基于KeyedStream上的狀態(tài)。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解為分區(qū)過的Operator State。

保存state的數(shù)據(jù)結構:

ValueState:即類型為T的單值狀態(tài)。這個狀態(tài)與對應的key綁定,是最簡單的狀態(tài)了。它可以通過update方法更新狀態(tài)值,通過value()方法獲取狀態(tài)值。

ListState:即key上的狀態(tài)值為一個列表?梢酝ㄟ^add方法往列表中附加值;也可以通過get()方法返回一個Iterable來遍歷狀態(tài)值。

ReducingState:這種狀態(tài)通過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最后合并到一個單一的狀態(tài)值。

MapState

需要注意的是,以上所述的State對象,僅僅用于與狀態(tài)進行交互(更新、刪除、清空等),而真正的狀態(tài)值,有可能是存在內存、磁盤、或者其他分布式存儲系統(tǒng)中。相當于我們只是持有了這個狀態(tài)的句柄。

1. ValueState

使用ValueState保存中間結果對下面數(shù)據(jù)進行分組求和。

開發(fā)步驟:

1. 獲取流處理執(zhí)行環(huán)境
 2. 加載數(shù)據(jù)源
 3. 數(shù)據(jù)分組
 4. 數(shù)據(jù)轉換,定義ValueState,保存中間結果
 5. 數(shù)據(jù)打印
 6. 觸發(fā)執(zhí)行

ValueState:測試數(shù)據(jù)源:

List(
  (1L, 4L),
  (2L, 3L),
  (3L, 1L),
  (1L, 2L),
  (3L, 2L),
  (1L, 2L),
  (2L, 2L),
  (2L, 9L)
)

示例代碼:

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object TestKeyedState {
 class CountWithKeyedState extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
   *
    * ValueState狀態(tài)句柄. 第一個值為count,第二個值為sum。
   
   private var sum: ValueState[(Long, Long)] = _
   override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
     // 獲取當前狀態(tài)值
     val tmpCurrentSum: (Long, Long) = sum.value
     // 狀態(tài)默認值
     val currentSum = if (tmpCurrentSum != null) {
       tmpCurrentSum
     } else {
       (0L, 0L)
     }
     // 更新
     val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
     // 更新狀態(tài)值
     sum.update(newSum)
     // 如果count >=3 清空狀態(tài)值,重新計算
     if (newSum._1 >= 3) {
       out.collect((input._1, newSum._2 / newSum._1))
       sum.clear()
     }
   }
   override def open(parameters: Configuration): Unit = {
     sum = getRuntimeContext.getState(
       new ValueStateDescriptor[(Long, Long)]("average", // 狀態(tài)名稱
         TypeInformation.of(new TypeHint[(Long, Long)](){}) )// 狀態(tài)類型
     )
   }
 }  
 def main(args: Array[String]): Unit = {
   //初始化執(zhí)行環(huán)境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //構建數(shù)據(jù)源
   val inputStream: DataStream[(Long, Long)] = env.fromCollection(
     List(
       (1L, 4L),
       (2L, 3L),
       (3L, 1L),
       (1L, 2L),
       (3L, 2L),
       (1L, 2L),
       (2L, 2L),
       (2L, 9L))
   )
   //執(zhí)行數(shù)據(jù)處理
   inputStream.keyBy(0)
     .flatMap(new CountWithKeyedState)
     .setParallelism(1)
     .print
   //運行任務
   env.execute
 }
}  
2. MapState

使用MapState保存中間結果對下面數(shù)據(jù)進行分組求和:

1. 獲取流處理執(zhí)行環(huán)境
 2. 加載數(shù)據(jù)源
 3. 數(shù)據(jù)分組
 4. 數(shù)據(jù)轉換,定義MapState,保存中間結果
 5. 數(shù)據(jù)打印
 6. 觸發(fā)執(zhí)行

MapState:測試數(shù)據(jù)源:

List(
  ("java", 1),
  ("python", 3),
  ("java", 2),
  ("scala", 2),
  ("python", 1),
  ("java", 1),
  ("scala", 2)
)  

示例代碼:

object MapState {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   *
     * 使用MapState保存中間結果對下面數(shù)據(jù)進行分組求和
     * 1.獲取流處理執(zhí)行環(huán)境
     * 2.加載數(shù)據(jù)源
     * 3.數(shù)據(jù)分組
     * 4.數(shù)據(jù)轉換,定義MapState,保存中間結果
     * 5.數(shù)據(jù)打印
     * 6.觸發(fā)執(zhí)行
     
   val source: DataStream[(String, Int)] = env.fromCollection(List(
     ("java", 1),
     ("python", 3),
     ("java", 2),
     ("scala", 2),
     ("python", 1),
     ("java", 1),
     ("scala", 2)))
 
   source.keyBy(0)
     .map(new RichMapFunction[(String, Int), (String, Int)] {
       var mste: MapState[String, Int] = _
       override def open(parameters: Configuration): Unit = {
         val msState = new MapStateDescriptor[String, Int]("ms",
           TypeInformation.of(new TypeHint[(String)] {}),
           TypeInformation.of(new TypeHint[(Int)] {}))
         mste = getRuntimeContext.getMapState(msState)
       }
       override def map(value: (String, Int)): (String, Int) = {
         val i: Int = mste.get(value._1)
         mste.put(value._1, value._2 + i)
         (value._1, value._2 + i)
       }
     }).print()
   env.execute()
 }
}  
2. State-Operator State

與Key無關的State,與Operator綁定的state,整個operator只對應一個state。

保存state的數(shù)據(jù)結構:

ListState

舉例來說,Flink中的 Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。

步驟:

獲取執(zhí)行環(huán)境

設置檢查點機制:路徑,重啟策略

自定義數(shù)據(jù)源

需要繼承并行數(shù)據(jù)源和CheckpointedFunction設置listState,通過上下文對象context獲取數(shù)據(jù)處理,保留offset制作快照

數(shù)據(jù)打印

觸發(fā)執(zhí)行

示例代碼:

import java.util
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object ListOperate {
 def main(args: Array[String]): Unit = {
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.setParallelism(1)
   env.enableCheckpointing(5000)
   env.setStateBackend(new FsStateBackend("hdfs://node01:8020/tmp/check/8"))
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
   env.getCheckpointConfig.setCheckpointTimeout(60000)
   env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
   env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   //重啟策略
   env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(1), Time.seconds(5)))
   //模擬kakfa偏移量
   env.addSource(new MyRichParrelSourceFun)
     .print()
   env.execute()
 }
}
class MyRichParrelSourceFun extends RichParallelSourceFunction[String]
 with CheckpointedFunction {
 var listState: ListState[Long] = _
 var offset: Long = 0L
 //任務運行
 override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
   val iterState: util.Iterator[Long] = listState.get().iterator()
   while (iterState.hasNext) {
     offset = iterState.next()
   }
   while (true) {
     offset += 1
     ctx.collect("offset:"+offset)
     Thread.sleep(1000)
     if(offset > 10){
       1/0
     }
   }
 }
 //取消任務
 override def cancel(): Unit = ???
 //制作快照
 override def snapshotState(context: FunctionSnapshotContext): Unit = {
   listState.clear()
   listState.add(offset)
 }
 //初始化狀態(tài)
 override def initializeState(context: FunctionInitializationContext): Unit = {
   listState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long](
     "listState", TypeInformation.of(new TypeHint[Long] {})
   ))
 }
}
3. Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在開發(fā)過程中,如果遇到需要下發(fā)/廣播配置、規(guī)則等低吞吐事件流到下游所有 task 時,就可以使用 Broadcast State 特性。下游的 task 接收這些配置、規(guī)則并保存為 BroadcastState, 將這些配置應用到另一個數(shù)據(jù)流的計算中 。

1) API介紹

通常,我們首先會創(chuàng)建一個Keyed或Non-Keyed的Data Stream,然后再創(chuàng)建一個Broadcasted Stream,最后通過Data Stream來連接(調用connect方法)到Broadcasted Stream上,這樣實現(xiàn)將Broadcast State廣播到Data Stream下游的每個Task中。

如果Data Stream是Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時需要使用KeyedBroadcastProcessFunction來實現(xiàn),下面是KeyedBroadcastProcessFunction的API,代碼如下所示:

public abstract class KeyedBroadcastProcessFunction

上面泛型中的各個參數(shù)的含義,說明如下:

KS:表示Flink程序從最上游的Source Operator開始構建Stream,當調用keyBy時所依賴的Key的類型;IN1:表示非Broadcast的Data Stream中的數(shù)據(jù)記錄的類型;IN2:表示Broadcast Stream中的數(shù)據(jù)記錄的類型;OUT:表示經過KeyedBroadcastProcessFunction的processElement()和processBroadcastElement()方法處理后輸出結果數(shù)據(jù)記錄的類型。

如果Data Stream是Non-Keyed Stream,則連接到Broadcasted Stream后,添加處理ProcessFunction時需要使用BroadcastProcessFunction來實現(xiàn),下面是BroadcastProcessFunction的API,代碼如下所示:

public abstract class BroadcastProcessFunction

上面泛型中的各個參數(shù)的含義,與前面KeyedBroadcastProcessFunction的泛型類型中的后3個含義相同,只是沒有調用keyBy操作對原始Stream進行分區(qū)操作,就不需要KS泛型參數(shù)。

注意事項:

Broadcast State 是Map類型,即K-V類型。

Broadcast State 只有在廣播一側的方法中processBroadcastElement可以修改;在非廣播一側方法中processElement只讀。

Broadcast State在運行時保存在內存中。

2) 場景舉例

動態(tài)更新計算規(guī)則: 如事件流需要根據(jù)最新的規(guī)則進行計算,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中。

實時增加額外字段: 如事件流需要實時增加用戶的基礎信息,則可將用戶的基礎信息作為廣播狀態(tài)廣播到下游Task中。

七、Flink的容錯1. Checkpoint介紹

checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠將整個應用流圖的狀態(tài)恢復到故障之前的某一狀態(tài),保 證應用流圖狀態(tài)的一致性。Flink的checkpoint機制原理來自“Chandy-Lamport algorithm”算法。

每個需要checkpoint的應用在啟動時,Flink的JobManager為其創(chuàng)建一個 CheckpointCoordinator(檢查點協(xié)調器),CheckpointCoordinator全權負責本應用的快照制作。

CheckpointCoordinator(檢查點協(xié)調器) 周期性的向該流應用的所有source算子發(fā)送 barrier(屏障)。

當某個source算子收到一個barrier時,便暫停數(shù)據(jù)處理過程,然后將自己的當前狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復數(shù)據(jù)處理

下游算子收到barrier之后,會暫停自己的數(shù)據(jù)處理過程,然后將自身的相關狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復數(shù)據(jù)處理。

每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。

當CheckpointCoordinator收到所有算子的報告之后,認為該周期的快照制作成功; 否則,如果在規(guī)定的時間內沒有收到所有算子的報告,則認為本周期快照制作失敗。

如果一個算子有兩個輸入源,則暫時阻塞先收到barrier的輸入源,等到第二個輸入源相 同編號的barrier到來時,再制作自身快照并向下游廣播該barrier。具體如下圖所示:

假設算子C有A和B兩個輸入源

在第i個快照周期中,由于某些原因(如處理時延、網絡時延等)輸入源A發(fā)出的 barrier 先到來,這時算子C暫時將輸入源A的輸入通道阻塞,僅收輸入源B的數(shù)據(jù)。

當輸入源B發(fā)出的barrier到來時,算子C制作自身快照并向 CheckpointCoordinator 報告自身的快照制作情況,然后將兩個barrier合并為一個,向下游所有的算子廣播。

當由于某些原因出現(xiàn)故障時,CheckpointCoordinator通知流圖上所有算子統(tǒng)一恢復到某個周期的checkpoint狀態(tài),然后恢復數(shù)據(jù)流處理。分布式checkpoint機制保證了數(shù)據(jù)僅被處理一次(Exactly Once)。

2. 持久化存儲1) MemStateBackend

該持久化存儲主要將快照數(shù)據(jù)保存到JobManager的內存中,僅適合作為測試以及快照的數(shù)據(jù)量非常小時使用,并不推薦用作大規(guī)模商業(yè)部署。

MemoryStateBackend 的局限性:

默認情況下,每個狀態(tài)的大小限制為 5 MB。可以在MemoryStateBackend的構造函數(shù)中增加此值。

無論配置的最大狀態(tài)大小如何,狀態(tài)都不能大于akka幀的大小(請參閱配置)。

聚合狀態(tài)必須適合 JobManager 內存。

建議MemoryStateBackend 用于:

本地開發(fā)和調試。

狀態(tài)很少的作業(yè),例如僅包含一次記錄功能的作業(yè)(Map,FlatMap,Filter,...),kafka的消費者需要很少的狀態(tài)。

2) FsStateBackend

該持久化存儲主要將快照數(shù)據(jù)保存到文件系統(tǒng)中,目前支持的文件系統(tǒng)主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個算子在節(jié)點A上失敗,在節(jié)點B上恢復,使用本地文件時,在B上無法讀取節(jié)點 A上的數(shù)據(jù),導致狀態(tài)恢復失敗。

建議FsStateBackend:

具有大狀態(tài),長窗口,大鍵 / 值狀態(tài)的作業(yè)。

所有高可用性設置。

3) RocksDBStateBackend

RocksDBStatBackend介于本地文件和HDFS之間,平時使用RocksDB的功能,將數(shù) 據(jù)持久化到本地文件中,當制作快照時,將本地數(shù)據(jù)制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態(tài)以ListState的形式保存在StatBackend中,如果一個key值中有多個value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據(jù)應用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。

4) 語法val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 設置checkpoint的執(zhí)行模式,最多執(zhí)行一次或者至少執(zhí)行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設置checkpoint的超時時間
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 如果在只做快照過程中出現(xiàn)錯誤,是否讓整體任務失敗:true是  false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//設置同一時間有多少 個checkpoint可以同時執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
5) 修改State Backend的兩種方式

第一種:單任務調整

修改當前任務代碼

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

第二種:全局調整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

6) Checkpoint的高級選項

默認checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認的checkPointMode是Exactly-once

//配置一秒鐘開啟一個checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的執(zhí)行模式
//兩種可選:
//CheckpointingMode.EXACTLY_ONCE:默認值
//CheckpointingMode.AT_LEAST_ONCE
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場景要求極低的延遲(幾毫秒)
注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時保證EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
默認情況下,檢查點不被保留,僅用于在故障中恢復作業(yè),可以啟用外部持久化檢查點,同時指定保留策略:
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時保留檢查點,注意,在這種情況下,您必須在取消后手動清理檢查點狀態(tài)
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當作業(yè)在被cancel時,刪除檢查點,檢查點僅在作業(yè)失敗時可用
//設置checkpoint超時時間
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超時時間,超時時間內沒有完成則被終止
//Checkpointing最小時間間隔,用于指定上一個checkpoint完成之后
//最小等多久可以觸發(fā)另一個checkpoint,當指定這個參數(shù)時,maxConcurrentCheckpoints的值為1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設置同一個時間是否可以有多個checkpoint執(zhí)行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定運行中的checkpoint最多可以有多少個
env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用于指定在checkpoint發(fā)生異常的時候,是否應該fail該task,默認是true,如果設置為false,則task會拒絕checkpoint然后繼續(xù)運行
2. Flink的重啟策略

Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過默認的重啟策略來重啟,這個默認的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時候指定了重啟策略,這個重啟策略就會覆蓋掉集群的默認重啟策略。

1) 概覽

默認的重啟策略是通過Flink的 flink-conf.yaml 來指定的,這個配置參數(shù) restart-strategy 定義了哪種策略會被采用。如果checkpoint未啟動,就會采用 no restart 策略,如果啟動了checkpoint機制,但是未指定重啟策略的話,就會采用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請參考下面的可用重啟策略來了解哪些值是支持的。

每個重啟策略都有自己的參數(shù)來控制它的行為,這些值也可以在配置文件中設置,每個重啟策略的描述都包含著各自的配置值信息。

重啟策略重啟策略值Fixed delayfixed-delayFailure ratefailure-rateNo restartNone

除了定義一個默認的重啟策略之外,你還可以為每一個Job指定它自己的重啟策略,這個重啟策略可以在 ExecutionEnvironment 中調用 setRestartStrategy() 方法來程序化地調用,注意這種方式同樣適用于 StreamExecutionEnvironment。

下面的例子展示了如何為Job設置一個固定延遲重啟策略,一旦有失敗,系統(tǒng)就會嘗試每10秒重啟一次,重啟3次。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重啟次數(shù)
 Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔
))
2) 固定延遲重啟策略(Fixed Delay Restart Strategy)

固定延遲重啟策略會嘗試一個給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個固定的時間。

重啟策略可以配置flink-conf.yaml的下面配置參數(shù)來啟用,作為默認的重啟策略:

restart-strategy: fixed-delay
配置參數(shù)描述默認值restart-strategy.fixed-delay.attempts在Job最終宣告失敗之前,Flink嘗試執(zhí)行的次數(shù)1,如果啟用checkpoint的話是Integer.MAX_VALUErestart-strategy.fixed-delay.delay延遲重啟意味著一個執(zhí)行失敗之后,并不會立即重啟,而是要等待一段時間。akka.ask.timeout,如果啟用checkpoint的話是1s

例子:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延遲重啟也可以在程序中設置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
 3, // 重啟次數(shù)
 Time.of(10, TimeUnit.SECONDS) // 重啟時間間隔
))
3) 失敗率重啟策略

失敗率重啟策略在Job失敗后會重啟,但是超過失敗率后,Job會最終被認定失敗。在兩個連續(xù)的重啟嘗試之間,重啟策略會等待一個固定的時間。

<上一頁  1  2  3  4  下一頁>  余下全文
聲明: 本文由入駐維科號的作者撰寫,觀點僅代表作者本人,不代表OFweek立場。如有侵權或其他問題,請聯(lián)系舉報。

發(fā)表評論

0條評論,0人參與

請輸入評論內容...

請輸入評論/評論長度6~500個字

您提交的評論過于頻繁,請輸入驗證碼繼續(xù)

暫無評論

暫無評論

人工智能 獵頭職位 更多
掃碼關注公眾號
OFweek人工智能網
獲取更多精彩內容
文章糾錯
x
*文字標題:
*糾錯內容:
聯(lián)系郵箱:
*驗 證 碼:

粵公網安備 44030502002758號