- Hadoop FileSystem 连接器
- 分桶文件 Sink
- 分桶文件 Sink
Hadoop FileSystem 连接器
这个连接器可以向所有 Hadoop FileSystem 支持的文件系统写入分区文件。使用前,需要在工程里添加下面的依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.9.0</version></dependency>
注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 这里。
分桶文件 Sink
关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:
DataStream<String> input = ...;input.addSink(new BucketingSink<String>("/base/path"));
val input: DataStream[String] = ...input.addSink(new BucketingSink[String]("/base/path"))
初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。
默认情况下,当数据到来时,分桶 sink 会按照系统时间对数据进行切分,并以 "yyyy-MM-dd—HH" 的时间格式给每个桶命名。然后 DateTimeFormatter 按照这个时间格式将当前系统时间以 JVM 默认时区转换成分桶的路径。用户可以自定义时区来生成分桶的路径。每遇到一个新的日期都会产生一个新的桶。例如,如果时间的格式以分钟为粒度,那么每分钟都会产生一个桶。每个桶都是一个目录,目录下包含了几个部分文件(part files):每个 sink 的并发实例都会创建一个属于自己的部分文件,当这些文件太大的时候,sink 会产生新的部分文件。当一个桶不再活跃时,打开的部分文件会刷盘并且关闭。如果一个桶最近一段时间都没有写入,那么这个桶被认为是不活跃的。sink 默认会每分钟检查不活跃的桶、关闭那些超过一分钟没有写入的桶。这些行为可以通过 BucketingSink 的 setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold() 进行设置。
可以调用BucketingSink 的 setBucketer() 方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。
默认的 writer 是 StringWriter。数据到达时,通过 toString() 方法得到内容,内容以换行符分隔,StringWriter 将数据内容写入部分文件。可以通过 BucketingSink 的 setWriter() 指定自定义的 writer。SequenceFileWriter 支持写入 HadoopSequenceFiles,并且可以配置是否开启压缩。
关闭部分文件和打开新部分文件的时机可以通过两个配置来确定:
- 设置文件大小(默认文件大小是384MB)
- 设置文件滚动周期,单位是毫秒(默认滚动周期是
Long.MAX_VALUE)
当上述两个条件中的任意一个被满足,都会生成一个新的部分文件。
示例:
DataStream<Tuple2<IntWritable,Text>> input = ...;BucketingSink<Tuple2<IntWritable,Text>> sink = new BucketingSink<Tuple2<IntWritable,Text>>("/base/path");sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));sink.setWriter(new SequenceFileWriter<IntWritable, Text>());sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 minsinput.addSink(sink);
// the SequenceFileWriter only works with Flink Tuplesimport org.apache.flink.api.java.tuple.Tuple2val input: DataStream[Tuple2[A, B]] = ...val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))sink.setWriter(new SequenceFileWriter[IntWritable, Text])sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 minsinput.addSink(sink)
上述代码会创建一个 sink,这个 sink 按下面的模式写入桶文件:
/base/path/{date-time}/part-{parallel-task}-{count}
date-time 是我们从日期/时间格式获得的字符串,parallel-task 是 sink 并发实例的索引,count 是因文件大小或者滚动周期而产生的文件的编号。
更多信息,请参考 BucketingSink。
