Garland +

flink 动态 schema 写 parquet

之前我司的日志处理流程是,日志落盘到文件,spark 每天定时任务去清洗日志,生成 parquet 然后从 hive 里读取,由于之前的日志 一直没有统一的 schema,相当于每一个新打点都得写一个新的解析操作,然后去 hive 建表这种,是一种批处理的逻辑。

现在上了 flink 以及新的打点系统就打算把这一套用 flink 实现一下,期望如下:

  1. flink 实时将日志写到 parquet
  2. 新加打点 schema 能够动态感知,不需要重启任务
  3. hive 那边能实时读取

目标有了,那就开搞

新的日志还是走 kafka,但是有一个不同的是新的日志有一个服务来维护所有打点的 schema

翻了下官方文档 flink 是支持 sink 到 parquet 的,flink 的 Streaming File Sink[1] 支持写入到 parquet 这种列式存储, 需要注意的是列式存储的存储策略是 OnCheckpointRollingPolicy, 也就是在 checkpoint 的时候去完成这个周期的写入,而行存储 模式可以根据文件大小来设置写入策略。

猜想原因应该是为了保证一致性,行存储其实是无脑追加,程序只用维护一个文件的 offset,在这个周期如果发生异常就直接 trim 掉 offset 后的数据然后重新写入即可,而列存储的话相当于维护了一个转置的表格,由于同类型数据都放在一起,程序很难像行存储一样去维护这样一个 offset,所以只能是每个 checkpoint 周期开启一个新的文件进行写入,发生异常回滚整个文件这种操作。

flink 提供了 StreamingFileSink.fowBulkFormat 这样的方法来写列存储,具体参数如下

  1. /**
  2. * Creates the builder for a {@link StreamingFileSink} with row-encoding format.
  3. * @param basePath the base path where all the buckets are going to be created as sub-directories.
  4. * @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets.
  5. * @param <IN> the type of incoming elements
  6. * @return The builder where the remaining of the configuration parameters for the sink can be configured.
  7. * In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
  8. */
  9. public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat(
  10. final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
  11. return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
  12. }

需要一个 basePath 来当根目录,需要一个 BulkWriter.Factory 类型的方法来做具体的写入操作,flink-parquet 提供了 ParquetAvroWriters 来生成这个 Factory,提供了三个方法:

另外还指出 bucket 设置,继承 BasePathBucketAssigner 重写 getBucketId 方法生成形如 k1={v1}/k2={v2} 这样的路径 即可完成分区,hive 可以直接使用岂不是美滋滋,所以一个简单的写 parquet 程序如下:

  1. package flink.note
  2. import org.apache.flink.streaming.api.TimeCharacteristic
  3. import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
  4. import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
  5. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
  6. import org.apache.flink.core.fs.Path
  7. import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
  8. object SinkParquetExample {
  9. def main(args: Array[String]): Unit = {
  10. val env = StreamExecutionEnvironment.getExecutionEnvironment
  11. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  12. env.enableCheckpointing(1000)
  13. val source =env.socketTextStream("localhost", 9999)
  14. .map(p => {
  15. val rs = p.split(',')
  16. Data(rs(0), rs(1).toInt
  17. )
  18. })
  19. val sink = StreamingFileSink
  20. .forBulkFormat(
  21. new Path("file:///Users/garland/Desktop/tmp"),
  22. ParquetAvroWriters.forReflectRecord(classOf[Data])
  23. )
  24. .withBucketAssigner(new CustomBucketAssigner)
  25. .build()
  26. source.addSink(sink)
  27. env.execute()
  28. }
  29. class CustomBucketAssigner extends BasePathBucketAssigner[Data] {
  30. override def getBucketId(element: Data, context: BucketAssigner.Context): String = s"name=${element.name}"
  31. }
  32. case class Data(name: String, count: Int)
  33. }

这里需要增加下面两个依赖

  1. val flinkParquet = "org.apache.flink" % "flink-parquet" % flinkVersion
  2. val ParquetAvro = "org.apache.parquet" % "parquet-avro" % "1.10.0"

👆说生成 BulkWriter.Factory 可以通过 forGenericRecord 之间传入一个 Schema 类型来实现,那么支持动态 schema 只用在这个方法里动态去获取 schema 就可以了,所以可以实现一个 CustomParquetWriter 示例如下

  1. package flink.note
  2. import org.apache.flink.formats.parquet.ParquetBuilder
  3. import org.apache.flink.formats.parquet.ParquetWriterFactory
  4. import org.apache.avro.Schema
  5. import org.apache.avro.generic.{GenericData, GenericRecord}
  6. import org.apache.parquet.avro.AvroParquetWriter
  7. import org.apache.parquet.hadoop.ParquetWriter
  8. import org.apache.parquet.io.OutputFile
  9. import java.io.IOException
  10. import flink.note.SchemaGetter
  11. object CustomParquetWriter {
  12. /**
  13. * Creates a ParquetWriterFactory that accepts and writes Avro generic types.
  14. * The Parquet writers will use the given schema to build and write the columnar data.
  15. *
  16. * @param schemaServiceURL The URL of avroSchema.
  17. */
  18. def forStringToGenericRecord(schemaServiceURL: String): ParquetWriterFactory[GenericRecord] = {
  19. val builder = new ParquetBuilder[GenericRecord] {
  20. override def createWriter(out: OutputFile): ParquetWriter[GenericRecord] = {
  21. // 这里进行动态获取 schema 的操作
  22. val schemaString = SchemaGetter.getAvroSchema(schemaServiceURL)
  23. createAvroParquetWriter(schemaString, GenericData.get, out)
  24. }
  25. }
  26. new ParquetWriterFactory[GenericRecord](builder)
  27. }
  28. @throws[IOException]
  29. private def createAvroParquetWriter[T](
  30. schemaString: String,
  31. dataModel: GenericData,
  32. out: OutputFile
  33. ): ParquetWriter[T] = {
  34. val schema: Schema = new Schema.Parser().parse(schemaString)
  35. AvroParquetWriter
  36. .builder[T](out)
  37. .withSchema(schema)
  38. .withDataModel(dataModel)
  39. .build()
  40. }
  41. }
言:
我命令你,喜欢我!

Blog

Thoughts

Project