基于 StreamingFileSink 实现 Flink 分布式文件持久化

817
类别: 
开发交流

目录

  1. 代码结构
    代码解析
    (1) 主程序入口
    (2) 配置文件 Sink
    (3) 添加 Sink 到数据流
    (4) 执行任务
  2. 输出结果

这段代码展示了如何使用 Apache Flink 将数据流以文本形式写入文件,并配置了文件的滚动策略。以下是对代码的详细解析和说明:

1. 代码结构

包声明:package sink
定义了代码所在的包。

导入依赖:
导入了必要的 Java 和 Flink 相关类库,包括:
java.util.concurrent.TimeUnit:用于时间单位转换。
org.apache.flink:Flink 的核心类库。
org.apache.flink.streaming.api.functions.sink.filesystem :Flink 的文件 Sink 相关类。

sinkToFile 对象:
主程序入口,包含 Flink 流处理逻辑和文件 Sink 的配置。

package sink

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
/**
*
* @PROJECT_NAME: flink1.13
* @PACKAGE_NAME: sink
* @author: 赵嘉盟-HONOR
* @data: 2023-11-19 23:26
* @DESCRIPTION
*
*/
object sinkToFile {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val data = env.fromElements(
Event("Mary", "./home", 100L),
Event("Sum", "./cart", 500L),
Event("King", "./prod", 1000L),
Event("King", "./root", 200L)
)

//TODO 直接一文本形式分布式写到文件中
val fileSink = StreamingFileSink
.forRowFormat(
new Path("src/main/resources/output/fileSink"),
new SimpleStringEncoder[String]("UTF-8")
)
.withRollingPolicy(  //指定滚动策略
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) //十五分钟混动
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) //五分钟无数据滚动
.withMaxPartSize(1024*1024*1024)                          //最大文件大小
.build()
)
.build()
data.map(_.toString).addSink(fileSink)

env.execute("SinkFile")
}
}

基于 scala 使用flink 将读取到的数据 分布式 写入到文件中
可以指定滚动策略(滚动时间、滚动方式、最大文件数量等)

代码解析

(1) 主程序入口

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements(
Event("Mary", "./home", 100L),
Event("Sum", "./cart", 500L),
Event("King", "./prod", 1000L),
Event("King", "./root", 200L)
)

创建 Flink 流处理环境 StreamExecutionEnvironment
使用 fromElements 方法生成一个包含 4 个 Event 对象的流。

(2) 配置文件 Sink

val fileSink = StreamingFileSink
.forRowFormat(
new Path("src/main/resources/output/fileSink"),
new SimpleStringEncoder[String]("UTF-8")
)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(15)) // 15 分钟滚动
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5)) // 5 分钟无数据滚动
.withMaxPartSize(1024 * 1024 * 1024) // 最大文件大小 1GB
.build()
)
.build()

使用 StreamingFileSink.forRowFormat 创建一个文件 Sink:
指定输出路径为 src/main/resources/output/fileSink
使用 SimpleStringEncoder 将数据编码为 UTF-8 格式的字符串。

配置滚动策略 DefaultRollingPolicy
withRolloverInterval(15):每 15 分钟滚动一次文件。
withInactivityInterval(5):如果 5 分钟内没有新数据,则滚动文件。
withMaxPartSize(1024 * 1024 * 1024):当文件大小达到 1GB 时滚动文件。

(3) 添加 Sink 到数据流

data.map(_.toString).addSink(fileSink)

Event 对象转换为字符串。
将文件 Sink 添加到数据流中。

(4) 执行任务

env.execute("SinkFile")

启动 Flink 流处理任务,任务名称为 SinkFile

3. 输出结果

程序运行后,数据会以文本形式写入 src/main/resources/output/fileSink 目录下的文件中。文件会根据滚动策略进行分割,例如:
每 15 分钟生成一个新文件。
如果 5 分钟内没有新数据,则生成一个新文件。
当文件大小达到 1GB 时生成一个新文件。

文件内容示例:

Event(Mary,./home,100)
Event(Sum,./cart,500)
Event(King,./prod,1000)
Event(King,./root,200)
评论0
/ 1000
4
0
收藏