Garland +

Flink 监测 a 事件后没有 b 事件

最近在用 Flink CEP 监控一些日志行为之间关系的事情,发现有个问题是随着任务的运行,state 体积会越来越大,大概查了下原因, 猜想是 CEP 只能监测迟到的事件,没法监测不到的事件,比如像下面的 demo:

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time


object CEPExampleX {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)

    val stream = env.socketTextStream("localhost", 9999)
      .map(p => {
        val r = p.split(" ")
        (r(0), r(1))
      })
      .keyBy(0)

    val pattern = Pattern
      .begin[(String, String)]("begin")
      .where(_._2.contains("123"))
      .followedBy("next")
      .where(_._2.contains("456"))
      .within(Time.seconds(3))

    CEP.pattern(stream, pattern)
      .select(
        new PatternTimeoutFunction[(String, String), String] {
          override def timeout(pattern: util.Map[String, util.List[(String, String)]], timeoutTimestamp: Long): String = {
            println("timeoutPattern >>>>>>>>> ", pattern, timeoutTimestamp)
            ""
          }
        },
        new PatternSelectFunction[(String, String), String] {
          override def select(pattern: util.Map[String, util.List[(String, String)]]): String = {
            println("selectPattern >>>>>>>>>> ", pattern)
            ""
          }
        }
      )
    env.execute()
  }
}

按 key 去匹配同一个流下的序列,在 3s 的 processingTime 内匹配上 (123, 456) 这样的序列,迟到和命中就都输出出来,所以对于以下的输入

-> nc -l 9999
a 123
b 123
a 567 (此时仅输出 a 流的 timeoutPattern)
... 

如果 b 流一直没有数据,那么 b 123 这条数据会一直 buffer 住,直到 b 流有数据,这也就是所谓的没法匹配不到的事件的现象,虽然 flink 只有一个全局的 watermark,但是我猜想对于 cep 在每个 keyedStream 内部还维护了一个类似语义的时间(仅是猜想,后面找时间专门研究研究)

而实际开发的时候有时候这个 keyedStream 可能是 m * n * r 这样庞大的组合,用 CEP 做事件匹配的话就会遇到大量的可能只出现 一条事件或者事件周期很长的流,那么势必会造成 CEP operator State 体积增大的情况,对于这种情况,只能合理分配 keyedStream 尽量让流里面有数据在走。

回过头来再看这个问题,如果确实想要匹配 a 事件后 xxx 时间内没有发生 b 事件 这样的行为该怎么搞呢?线上业务中有很大一部分类似的需求,比如用户提交订单10分钟还没支付要给人发个推送类似的场景,也可以用 keyedStream 配合自定义 State 和触发器实现

package flink.note

import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector


object KeyedStateTimerExample {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    env.socketTextStream("localhost", 9999)
      .map(p => {
        val r = p.split(" ")
        (r(0), r(1), System.currentTimeMillis())
      })
      .assignAscendingTimestamps(_._3)
      .keyBy(0)
      .process(new KeyedTimeoutFunction)
      .print()


    env.execute()
  }

  class KeyedTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String, Long), String] {
    private var state: ValueState[Event] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      val config = StateTtlConfig.newBuilder(Time.minutes(1))
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .build()

      val descriptor = new ValueStateDescriptor("testState", classOf[Event])
      descriptor.enableTimeToLive(config)
      state = getRuntimeContext.getState(descriptor)
    }

    override def processElement(value: (String, String, Long), ctx: KeyedProcessFunction[Tuple, (String, String, Long), String]#Context, out: Collector[String]): Unit = {
      out.collect(value.toString())

      val currentTimestamp = ctx.timestamp()
      // 在 123 模式下设置 timer 在 3s 后检查自己有没有更新到 456
      value._2 match {
        case "123" =>
          ctx.timerService().registerProcessingTimeTimer(currentTimestamp + 3000)
          state.update(Event(value._1, value._2, currentTimestamp))
        case "456" =>
          state.update(Event(value._1, value._2, currentTimestamp))
        case _ =>
      }
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String, Long), String]#OnTimerContext, out: Collector[String]): Unit = {
      if (state.value().data != "456" && state.value().ts + 3000 == timestamp) {
        out.collect(s"timeout ${state.value()}, current $timestamp")
      }
    }
  }

  case class Event(name: String, data: String, ts: Long)
}

由于要用到 ctx.timestamp,所以这里用了 processingTime 当作 eventTime,processingTime 下 ctx.timestamp 会 为 null,注意这里尽管时间语意是 EventTime,为了模拟方便在设置 Timer 的时候用的是 registerProcessingTimeTimer, keyedStream 在遇到 123 时设置 timer 让自己 3s 后检查状态有没有变更到 456,没有的话则输出 timeout 消息,结果如下

(a,123,1567582426047)
(a,567,1567582427929)
timeout Event(a,123,1567582426047), current 1567582431047
(a,123,1567582440011)
timeout Event(a,123,1567582440011), current 1567582445011
(a,123,1567582448238)
(a,456,1567582449994)

不过该用 CEP 的时候还是得用 CEP,对外开放的接口以及后面上 SQL,手动操作状态总是不太优雅,后面再仔细研究下 CEP。

言:

Blog

Thoughts

Project