Flink 监测 a 事件后没有 b 事件
2019-05-23
最近在用 Flink CEP 监控一些日志行为之间关系的事情,发现有个问题是随着任务的运行,state 体积会越来越大,大概查了下原因, 猜想是 CEP 只能监测迟到的事件,没法监测不到的事件,比如像下面的 demo:
import java.utilimport org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}import org.apache.flink.cep.scala.CEPimport org.apache.flink.cep.scala.pattern.Patternimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import org.apache.flink.streaming.api.windowing.time.Timeobject CEPExampleX {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.setParallelism(1)val stream = env.socketTextStream("localhost", 9999).map(p => {val r = p.split(" ")(r(0), r(1))}).keyBy(0)val pattern = Pattern.begin[(String, String)]("begin").where(_._2.contains("123")).followedBy("next").where(_._2.contains("456")).within(Time.seconds(3))CEP.pattern(stream, pattern).select(new PatternTimeoutFunction[(String, String), String] {override def timeout(pattern: util.Map[String, util.List[(String, String)]], timeoutTimestamp: Long): String = {println("timeoutPattern >>>>>>>>> ", pattern, timeoutTimestamp)""}},new PatternSelectFunction[(String, String), String] {override def select(pattern: util.Map[String, util.List[(String, String)]]): String = {println("selectPattern >>>>>>>>>> ", pattern)""}})env.execute()}}
按 key 去匹配同一个流下的序列,在 3s 的 processingTime 内匹配上 (123, 456) 这样的序列,迟到和命中就都输出出来,所以对于以下的输入
-> nc -l 9999a 123b 123a 567 (此时仅输出 a 流的 timeoutPattern)...
如果 b 流一直没有数据,那么 b 123 这条数据会一直 buffer 住,直到 b 流有数据,这也就是所谓的没法匹配不到的事件的现象,虽然
flink 只有一个全局的 watermark,但是我猜想对于 cep 在每个 keyedStream 内部还维护了一个类似语义的时间(仅是猜想,后面找时间专门研究研究)
而实际开发的时候有时候这个 keyedStream 可能是 m * n * r 这样庞大的组合,用 CEP 做事件匹配的话就会遇到大量的可能只出现
一条事件或者事件周期很长的流,那么势必会造成 CEP operator State 体积增大的情况,对于这种情况,只能合理分配 keyedStream
尽量让流里面有数据在走。
回过头来再看这个问题,如果确实想要匹配 a 事件后 xxx 时间内没有发生 b 事件 这样的行为该怎么搞呢?线上业务中有很大一部分类似的需求,比如用户提交订单10分钟还没支付要给人发个推送类似的场景,也可以用 keyedStream 配合自定义 State 和触发器实现
package flink.noteimport org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}import org.apache.flink.api.common.time.Timeimport org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import org.apache.flink.util.Collectorobject KeyedStateTimerExample {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)env.socketTextStream("localhost", 9999).map(p => {val r = p.split(" ")(r(0), r(1), System.currentTimeMillis())}).assignAscendingTimestamps(_._3).keyBy(0).process(new KeyedTimeoutFunction).print()env.execute()}class KeyedTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String, Long), String] {private var state: ValueState[Event] = _override def open(parameters: Configuration): Unit = {super.open(parameters)val config = StateTtlConfig.newBuilder(Time.minutes(1)).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build()val descriptor = new ValueStateDescriptor("testState", classOf[Event])descriptor.enableTimeToLive(config)state = getRuntimeContext.getState(descriptor)}override def processElement(value: (String, String, Long), ctx: KeyedProcessFunction[Tuple, (String, String, Long), String]#Context, out: Collector[String]): Unit = {out.collect(value.toString())val currentTimestamp = ctx.timestamp()// 在 123 模式下设置 timer 在 3s 后检查自己有没有更新到 456value._2 match {case "123" =>ctx.timerService().registerProcessingTimeTimer(currentTimestamp + 3000)state.update(Event(value._1, value._2, currentTimestamp))case "456" =>state.update(Event(value._1, value._2, currentTimestamp))case _ =>}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {if (state.value().data != "456" && state.value().ts + 3000 == timestamp) {out.collect(s"timeout ${state.value()}, current $timestamp")}}}case class Event(name: String, data: String, ts: Long)}
由于要用到 ctx.timestamp,所以这里用了 processingTime 当作 eventTime,processingTime 下 ctx.timestamp 会
为 null,注意这里尽管时间语意是 EventTime,为了模拟方便在设置 Timer 的时候用的是 registerProcessingTimeTimer,
keyedStream 在遇到 123 时设置 timer 让自己 3s 后检查状态有没有变更到 456,没有的话则输出 timeout 消息,结果如下
(a,123,1567582426047)(a,567,1567582427929)timeout Event(a,123,1567582426047), current 1567582431047(a,123,1567582440011)timeout Event(a,123,1567582440011), current 1567582445011(a,123,1567582448238)(a,456,1567582449994)
不过该用 CEP 的时候还是得用 CEP,对外开放的接口以及后面上 SQL,手动操作状态总是不太优雅,后面再仔细研究下 CEP。