Garland +

flink 动态 schema 写 parquet

背景

之前我司的日志处理流程是,日志落盘到文件,spark 每天定时任务去清洗日志,生成 parquet 然后从 hive 里读取,由于之前的日志 一直没有统一的 schema,相当于每一个新打点都得写一个新的解析操作,然后去 hive 建表这种,是一种批处理的逻辑。

现在上了 flink 以及新的打点系统就打算把这一套用 flink 实现一下,期望如下:

  1. flink 实时将日志写到 parquet
  2. 新加打点 schema 能够动态感知,不需要重启任务
  3. hive 那边能实时读取

目标有了,那就开搞

实现

新的日志还是走 kafka,但是有一个不同的是新的日志有一个服务来维护所有打点的 schema

翻了下官方文档 flink 是支持 sink 到 parquet 的,flink 的 Streaming File Sink[1] 支持写入到 parquet 这种列式存储, 需要注意的是列式存储的存储策略是 OnCheckpointRollingPolicy, 也就是在 checkpoint 的时候去完成这个周期的写入,而行存储 模式可以根据文件大小来设置写入策略。

猜想原因应该是为了保证一致性,行存储其实是无脑追加,程序只用维护一个文件的 offset,在这个周期如果发生异常就直接 trim 掉 offset 后的数据然后重新写入即可,而列存储的话相当于维护了一个转置的表格,由于同类型数据都放在一起,程序很难像行存储一样去维护这样一个 offset,所以只能是每个 checkpoint 周期开启一个新的文件进行写入,发生异常回滚整个文件这种操作。

flink 提供了 StreamingFileSink.fowBulkFormat 这样的方法来写列存储,具体参数如下

/**
 * Creates the builder for a {@link StreamingFileSink} with row-encoding format.
 * @param basePath the base path where all the buckets are going to be created as sub-directories.
 * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets.
 * @param <IN> the type of incoming elements
 * @return The builder where the remaining of the configuration parameters for the sink can be configured.
 * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
 */
public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat(
		final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
	return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
}

需要一个 basePath 来当根目录,需要一个 BulkWriter.Factory 类型的方法来做具体的写入操作,flink-parquet 提供了 ParquetAvroWriters 来生成这个 Factory,提供了三个方法:

另外还指出 bucket 设置,继承 BasePathBucketAssigner 重写 getBucketId 方法生成形如 k1={v1}/k2={v2} 这样的路径 即可完成分区,hive 可以直接使用岂不是美滋滋,所以一个简单的写 parquet 程序如下:

package flink.note

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters


object SinkParquetExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.enableCheckpointing(1000)

     val source =env.socketTextStream("localhost", 9999)
       .map(p => {
         val rs = p.split(',')
         Data(rs(0), rs(1).toInt
         )
       })

    val sink = StreamingFileSink
      .forBulkFormat(
        new Path("file:///Users/garland/Desktop/tmp"),
        ParquetAvroWriters.forReflectRecord(classOf[Data])
      )
      .withBucketAssigner(new CustomBucketAssigner)
      .build()

    source.addSink(sink)
    env.execute()
  }

  class CustomBucketAssigner extends BasePathBucketAssigner[Data] {
    override def getBucketId(element: Data, context: BucketAssigner.Context): String = s"name=${element.name}"
  }

  case class Data(name: String, count: Int)
}

这里需要增加下面两个依赖

val flinkParquet = "org.apache.flink" % "flink-parquet" % flinkVersion
val ParquetAvro = "org.apache.parquet" % "parquet-avro" % "1.10.0"

支持动态 schema

👆说生成 BulkWriter.Factory 可以通过 forGenericRecord 之间传入一个 Schema 类型来实现,那么支持动态 schema 只用在这个方法里动态去获取 schema 就可以了,所以可以实现一个 CustomParquetWriter 示例如下

package flink.note

import org.apache.flink.formats.parquet.ParquetBuilder
import org.apache.flink.formats.parquet.ParquetWriterFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.io.OutputFile
import java.io.IOException

import flink.note.SchemaGetter

object CustomParquetWriter {

  /**
    * Creates a ParquetWriterFactory that accepts and writes Avro generic types.
    * The Parquet writers will use the given schema to build and write the columnar data.
    *
    * @param schemaServiceURL The URL of avroSchema.
    */
  def forStringToGenericRecord(schemaServiceURL: String): ParquetWriterFactory[GenericRecord] = {
    val builder = new ParquetBuilder[GenericRecord] {
      override def createWriter(out: OutputFile): ParquetWriter[GenericRecord] = {
        // 这里进行动态获取 schema 的操作
        val schemaString = SchemaGetter.getAvroSchema(schemaServiceURL)
        createAvroParquetWriter(schemaString, GenericData.get, out)
      }
    }
    new ParquetWriterFactory[GenericRecord](builder)
  }

  @throws[IOException]
  private def createAvroParquetWriter[T](
      schemaString: String,
      dataModel: GenericData,
      out: OutputFile
  ): ParquetWriter[T] = {
    val schema: Schema = new Schema.Parser().parse(schemaString)
    AvroParquetWriter
      .builder[T](out)
      .withSchema(schema)
      .withDataModel(dataModel)
      .build()
  }
}

hive 实时读取

结论

REF

言:

Blog

Thoughts

Project