flink 动态 schema 写 parquet
2019-05-03
背景
之前我司的日志处理流程是,日志落盘到文件,spark 每天定时任务去清洗日志,生成 parquet 然后从 hive 里读取,由于之前的日志 一直没有统一的 schema,相当于每一个新打点都得写一个新的解析操作,然后去 hive 建表这种,是一种批处理的逻辑。
现在上了 flink 以及新的打点系统就打算把这一套用 flink 实现一下,期望如下:
- flink 实时将日志写到 parquet
- 新加打点 schema 能够动态感知,不需要重启任务
- hive 那边能实时读取
目标有了,那就开搞
实现
新的日志还是走 kafka,但是有一个不同的是新的日志有一个服务来维护所有打点的 schema
flink 写 parquet
翻了下官方文档 flink 是支持 sink 到 parquet 的,flink 的 Streaming File Sink[1] 支持写入到 parquet 这种列式存储,
需要注意的是列式存储的存储策略是 OnCheckpointRollingPolicy
, 也就是在 checkpoint 的时候去完成这个周期的写入,而行存储
模式可以根据文件大小来设置写入策略。
猜想原因应该是为了保证一致性,行存储其实是无脑追加,程序只用维护一个文件的 offset,在这个周期如果发生异常就直接 trim 掉 offset 后的数据然后重新写入即可,而列存储的话相当于维护了一个转置的表格,由于同类型数据都放在一起,程序很难像行存储一样去维护这样一个 offset,所以只能是每个 checkpoint 周期开启一个新的文件进行写入,发生异常回滚整个文件这种操作。
flink 提供了 StreamingFileSink.fowBulkFormat 这样的方法来写列存储,具体参数如下
/**
* Creates the builder for a {@link StreamingFileSink} with row-encoding format.
* @param basePath the base path where all the buckets are going to be created as sub-directories.
* @param writerFactory the {@link BulkWriter.Factory} to be used when writing elements in the buckets.
* @param <IN> the type of incoming elements
* @return The builder where the remaining of the configuration parameters for the sink can be configured.
* In order to instantiate the sink, call {@link RowFormatBuilder#build()} after specifying the desired parameters.
*/
public static <IN> StreamingFileSink.BulkFormatBuilder<IN, String> forBulkFormat(
final Path basePath, final BulkWriter.Factory<IN> writerFactory) {
return new StreamingFileSink.BulkFormatBuilder<>(basePath, writerFactory, new DateTimeBucketAssigner<>());
}
需要一个 basePath 来当根目录,需要一个 BulkWriter.Factory 类型的方法来做具体的写入操作,flink-parquet 提供了 ParquetAvroWriters 来生成这个 Factory,提供了三个方法:
- forSpecificRecord: 传入一个 avro 类型来获取 schema
- forGenericRecord: 直接传入一个 Schema 类型
- forReflectRecord: 传入普通的类型比如 POJO、case class 通过反射来获取 schema
另外还指出 bucket 设置,继承 BasePathBucketAssigner
重写 getBucketId
方法生成形如 k1={v1}/k2={v2}
这样的路径
即可完成分区,hive 可以直接使用岂不是美滋滋,所以一个简单的写 parquet 程序如下:
package flink.note
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.filesystem.{BucketAssigner, StreamingFileSink}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
import org.apache.flink.core.fs.Path
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters
object SinkParquetExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.enableCheckpointing(1000)
val source =env.socketTextStream("localhost", 9999)
.map(p => {
val rs = p.split(',')
Data(rs(0), rs(1).toInt
)
})
val sink = StreamingFileSink
.forBulkFormat(
new Path("file:///Users/garland/Desktop/tmp"),
ParquetAvroWriters.forReflectRecord(classOf[Data])
)
.withBucketAssigner(new CustomBucketAssigner)
.build()
source.addSink(sink)
env.execute()
}
class CustomBucketAssigner extends BasePathBucketAssigner[Data] {
override def getBucketId(element: Data, context: BucketAssigner.Context): String = s"name=${element.name}"
}
case class Data(name: String, count: Int)
}
这里需要增加下面两个依赖
val flinkParquet = "org.apache.flink" % "flink-parquet" % flinkVersion
val ParquetAvro = "org.apache.parquet" % "parquet-avro" % "1.10.0"
支持动态 schema
👆说生成 BulkWriter.Factory
可以通过 forGenericRecord
之间传入一个 Schema 类型来实现,那么支持动态
schema 只用在这个方法里动态去获取 schema 就可以了,所以可以实现一个 CustomParquetWriter
示例如下
package flink.note
import org.apache.flink.formats.parquet.ParquetBuilder
import org.apache.flink.formats.parquet.ParquetWriterFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.io.OutputFile
import java.io.IOException
import flink.note.SchemaGetter
object CustomParquetWriter {
/**
* Creates a ParquetWriterFactory that accepts and writes Avro generic types.
* The Parquet writers will use the given schema to build and write the columnar data.
*
* @param schemaServiceURL The URL of avroSchema.
*/
def forStringToGenericRecord(schemaServiceURL: String): ParquetWriterFactory[GenericRecord] = {
val builder = new ParquetBuilder[GenericRecord] {
override def createWriter(out: OutputFile): ParquetWriter[GenericRecord] = {
// 这里进行动态获取 schema 的操作
val schemaString = SchemaGetter.getAvroSchema(schemaServiceURL)
createAvroParquetWriter(schemaString, GenericData.get, out)
}
}
new ParquetWriterFactory[GenericRecord](builder)
}
@throws[IOException]
private def createAvroParquetWriter[T](
schemaString: String,
dataModel: GenericData,
out: OutputFile
): ParquetWriter[T] = {
val schema: Schema = new Schema.Parser().parse(schemaString)
AvroParquetWriter
.builder[T](out)
.withSchema(schema)
.withDataModel(dataModel)
.build()
}
}