/** main() defines and executes the DataStream program */ defmain(args: Array[String]) {
// 省略不重要代码 val env = ...
// 省略不重要代码 val sensorData: DataStream[SensorReading] = ... // 先分key val keyedSensorData: KeyedStream[SensorReading, String] = sensorData.keyBy(_.id)
// 对keyedStream调用flatMap val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMap(newTemperatureAlertFunction(1.7))
// print result stream to standard out alerts.print()
// execute application env.execute("Generate Temperature Alerts") } }
/** * The function emits an alert if the temperature measurement of a sensor changed by more than * a configured threshold compared to the last reading. * * @param threshold The threshold to raise an alert. */ classTemperatureAlertFunction(val threshold: Double) extendsRichFlatMapFunction[SensorReading, (String, Double, Double)] {
// 用来存储 上一次温度 的状态 // the state handle object privatevar lastTempState: ValueState[Double] = _
// 初始化工作 overridedefopen(parameters: Configuration): Unit = { // create state descriptor 创建一个状态描述符 val lastTempDescriptor = newValueStateDescriptor[Double]( "lastTemp", classOf[Double]) // obtain the state handle 初始化并获取 上一次温度 状态的引用 lastTempState = getRuntimeContext.getState[Double](lastTempDescriptor) }
// fetch the last temperature from state 从状态中拿到上一次的温度 val lastTemp = lastTempState.value() // check if we need to emit an alert 计算差值 val tempDiff = (reading.temperature - lastTemp).abs // 如果差值超过阈值,就会被输出 if (tempDiff > threshold) { // temperature changed by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) }
// update lastTemp state 更新状态 this.lastTempState.update(reading.temperature) } }
val alerts: DataStream[(String, Double, Double)] = keyedData .flatMapWithState[(String, Double, Double), Double] { // 处理第一个事件到达的情况(这时没法对比阈值差距) // 第一个参数是当前事件 // 第二个参数是当前状态(Flink会从后端中取出状态并填充到这里) // 第一个返回值是flatMap的结果列表 // 第二个返回值是处理后的状态(Flink会用这个值来对后端中的状态进行更新) case (in: SensorReading, None) =>{ // no previous temperature defined; just update the last temperature (List.empty, Some(in.temperature)) } // 处理非第一个事件到达的情况(正常的处理,这里才能比较阈值) case (r: SensorReading, lastTemp: Some[Double]) => { // compare temperature difference with threshold val tempDiff = (r.temperature - lastTemp.get).abs if (tempDiff > 1.7) { // threshold exceeded; emit an alert and update the last temperature (List((r.id, r.temperature, tempDiff)), Some(r.temperature)) } else { // threshold not exceeded; just update the last temperature (List.empty, Some(r.temperature)) } } }
classHighTempCounter(val threshold: Double) extendsRichFlatMapFunction[SensorReading, (Int, Long)] withListCheckpointed[java.lang.Long] { // 获取当前任务的索引标志 privatelazyval subtaskIdx = getRuntimeContext.getIndexOfThisSubtask // 本地计数器(当前任务的本地状态) privatevar highTempCnt = 0L // 处理函数,每当温度超过阈值就对本地计数器+1 overridedefflatMap( in: SensorReading, out: Collector[(Int, Long)]): Unit = { if (in.temperature > threshold) { // increment counter if threshold is exceeded highTempCnt += 1 // emit update with subtask index and counter out.collect((subtaskIdx, highTempCnt)) } } /** * 用来返回快照,这个函数会在生成检查点时被调用 * @Param chkpntId: 检查点编号 * @Param ts: JobManager创建检测点时的时间戳 * @Return 将本地状态转换为一个列表来返回 */ overridedefsnapshotState( chkpntId: Long, ts: Long): java.util.List[java.lang.Long] = { // snapshot state as list with a single count java.util.Collections.singletonList(highTempCnt) } /** * 用于恢复本地状态,这个函数在初始化函数状态时被调用 * @Param state: 将这个列表转换为本地状态 */ overridedefrestoreState( state: util.List[java.lang.Long]): Unit = { highTempCnt = 0 // restore state by adding all longs of the list for (cnt <- state.asScala) { highTempCnt += cnt } } }
7.1.2.1 为什么要把算子状态当作列表来处理呢?
看完上个示例,您可能想知道为什么算子状态作为状态对象列表(a list of state objects)处理。这是因为List结构支持改变带算子状态的函数的并行性。为了增加或减少具有算子状态的函数的并行性,需要将算子状态重新分配给更多或更少的任务实例。这需要分割或合并状态对象,而一般来说,列表要比单个值更加适合分割和合并
// 获得只读的广播状态 val thresholds = readOnlyCtx.getBroadcastState(thresholdStateDescriptor) // 获取key对应的阈值,然后比较是否超过阈值,超过就报警 if (thresholds.contains(reading.id)) { // get threshold for sensor val sensorThreshold: Double = thresholds.get(reading.id)
// fetch the last temperature from state val lastTemp = lastTempState.value() // check if we need to emit an alert val tempDiff = (reading.temperature - lastTemp).abs if (tempDiff > sensorThreshold) { // temperature increased by more than the threshold out.collect((reading.id, reading.temperature, tempDiff)) } }
// emit new counters 输出 out.collect((v.id, keyHighTempCnt, opHighTempCnt)) } }
// 初始化 overridedefinitializeState(initContext: FunctionInitializationContext): Unit = { // initialize keyed state 初始化键状态 val keyCntDescriptor = newValueStateDescriptor[Long]("keyedCnt", classOf[Long]) keyedCntState = initContext.getKeyedStateStore.getState(keyCntDescriptor)
// initialize operator state 初始化算子状态 val opCntDescriptor = newListStateDescriptor[Long]("opCnt", classOf[Long]) opCntState = initContext.getOperatorStateStore.getListState(opCntDescriptor) // initialize local variable with state 用算子状态来设置本地变量的值 opHighTempCnt = opCntState.get().asScala.sum }
// 快照(用于检查点前的更新) overridedefsnapshotState(snapshotContext: FunctionSnapshotContext): Unit = { // update operator state with local state 把本地变量储存的值更新到算子状态 opCntState.clear() opCntState.add(opHighTempCnt) } }
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 为应用设置最大并行度 env.setMaxParallelism(512)
// 为算子设置最大并行度 val alerts: DataStream[(String, Double, Double)] = keyedSensorData .flatMap(newTemperatureAlertFunction(1.1)) // set the maximum parallelism for this operator and // override the application-wide value .setMaxParallelism(1024)
val env = StreamExecutionEnvironment.getExecutionEnvironment val checkpointPath: String = ??? // new 一个RocksDB状态后端实例 val backend = newRocksDBStateBackend(checkpointPath) // configure the state backend 配置它 env.setStateBackend(backend)
val tenSecsMaxTemps: DataStream[(String, Double)] = sensorData // project to sensor id and temperature .map(r => (r.id, r.temperature)) // compute every 10 seconds the max temperature per sensor .keyBy(_._1) .timeWindow(Time.seconds(10)) .max(1)
// store max temperature of the last 10 secs for each sensor // in a queryable state tenSecsMaxTemps // key by sensor id .keyBy(_._1) .asQueryableState("maxTemperature")
objectTemperatureDashboard{ // assume local setup and TM runs on same machine as client val proxyHost = "127.0.0.1" val proxyPort = 9069 // jobId of running QueryableStateJob // can be looked up in logs of running job or the web UI val jobId = "d2447b1a5e0d952c372064c886d2220a" // how many sensors to query val numSensors = 5 // how often to query the state val refreshInterval = 10000 defmain(args: Array[String]): Unit = { // configure client with host and port of queryable state proxy val client = newQueryableStateClient(proxyHost, proxyPort) val futures = newArray[ CompletableFuture[ValueState[(String, Double)]]](numSensors) val results = newArray[Double](numSensors) // print header line of dashboard table val header = (for (i <- 0 until numSensors) yield"sensor_" + (i + 1) .mkString("\t| ") println(header) // loop forever while (true) { // send out async queries for (i <- 0 until numSensors) { futures(i) = queryState("sensor_" + (i + 1), client) } // wait for results for (i <- 0 until numSensors) { results(i) = futures(i).get().value()._2 } // print result val line = results.map(t => f"$t%1.3f").mkString("\t| ") println(line) // wait to send out next queries Thread.sleep(refreshInterval) } client.shutdownAndWait() } defqueryState( key: String, client: QueryableStateClient) : CompletableFuture[ValueState[(String, Double)]] = { client .getKvState[String, ValueState[(String, Double)], (String, Double)]( JobID.fromHexString(jobId), // jobId "maxTemperature", //状态标志符 key, // 键 Types.STRING, // 键的类型 newValueStateDescriptor[(String, Double)]( //状态描述符 "", // state name not relevant here 此处与状态名称无关,随便取名 Types.TUPLE[(String, Double)])) } }