一、PyFlink 环境变量:决定你 Job 如何被执行
在 PyFlink 中,有些行为并不是由代码直接决定的,而是由环境变量控制。其中最重要的两个是:
1. FLINK_HOME:你到底在用哪套 Flink
PyFlink 在提交任务前,会先对 Job 进行编译和打包,这一步依赖 Flink 的发行版。
- 默认情况下:
PyFlink 自带了一套 Flink 发行版 - 你也可以通过
FLINK_HOME指定一套自定义 Flink 安装
适用场景包括:
- 本地调试和集群版本严格对齐
- 使用官方发行版未包含的 patch 或定制组件
2. PYFLINK_CLIENT_EXECUTABLE:用哪一个 Python 解释器跑任务
这个变量决定了:
flink run提交 PyFlink 任务时- Java / Scala Job 中执行 Python UDF 时
实际使用的 Python 解释器路径
优先级顺序如下(非常重要):
- 代码中显式配置
python.client.executable - 环境变量
PYFLINK_CLIENT_EXECUTABLE flink-conf.yaml中的python.client.executable- 默认使用
python
如果你遇到:
- 本地可以跑,集群跑不了
- 虚拟环境 / Conda 环境不生效
80% 的问题都和这个配置有关。
二、Hadoop Formats:让 Flink 直接复用 Hadoop 生态
Flink 并没有重复造轮子,而是通过Hadoop Compatibility 模块,直接复用 Hadoop 的 InputFormat 体系。
1. 依赖配置
要使用 Hadoop InputFormat,首先需要引入:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility</artifactId><version>2.2.0</version></dependency>如果你是在IDE 本地运行(而不是直接提交到集群),还需要额外加上:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.10.2</version><scope>provided</scope></dependency>2. 使用 Hadoop InputFormat 的方式
Flink 并不会直接使用 Hadoop InputFormat,而是通过HadoopInputs做一层包装:
readHadoopFile:适用于FileInputFormatcreateHadoopInput:通用 InputFormat
最终得到的 DataStream 类型是:
Tuple2<Key, Value>3. 示例:读取 Hadoop TextInputFormat
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();KeyValueTextInputFormattextInputFormat=newKeyValueTextInputFormat();DataStream<Tuple2<Text,Text>>input=env.createInput(HadoopInputs.readHadoopFile(textInputFormat,Text.class,Text.class,textPath));这种方式非常适合:
- 历史 Hadoop 数据迁移
- 混合 Flink + HDFS 的存量数据处理
三、DataGen Connector:本地开发与 Demo 的利器
如果你只是想:
- 验证算子逻辑
- 本地跑一条完整 pipeline
- 做一个不依赖 Kafka / DB 的 Demo
那么DataGen Connector几乎是必选项。
1. 核心特性
- 内置 Source,无需额外依赖
- 并行生成数据
- 支持速率限制
- 可控的确定性(利于 Exactly-Once)
2. 基本用法
GeneratorFunction<Long,String>generatorFunction=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generatorFunction,1000,Types.STRING);env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");如果并行度为 1,生成的数据顺序就是:
Number: 0 → Number: 9993. 限速:模拟真实流量
DataGeneratorSource<String>source=newDataGeneratorSource<>(generatorFunction,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),Types.STRING);这在以下场景非常有价值:
- 压测下游算子
- 模拟 Kafka 流速
- Demo 中避免「数据瞬间跑完」
四、Kafka Connector:Flink 生产环境的中枢神经
Kafka Connector 是 Flink 流处理体系中最核心、最复杂、也是最成熟的连接器。
1. 重要说明(Flink 2.2)
- Flink 提供的是通用 Kafka Connector
- PyFlink 暂时没有 SQL Kafka Jar
- Streaming Connector不包含在 Flink 二进制包中
2. Kafka Source:新一代 Data Source API
基本构建方式(Java)
KafkaSource<String>source=KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(newSimpleStringSchema()).build();env.fromSource(source,WatermarkStrategy.noWatermarks(),"Kafka Source");支持的订阅方式
- Topic 列表
- Topic 正则
- 指定 Partition 集合
起始 Offset 策略
- earliest / latest
- committed offsets
- timestamp
- 自定义(PyFlink 暂不支持)
有界 / 无界模式
- Streaming(默认)
- Batch(
setBounded) - Streaming + stopping offset
3. Kafka Sink:Exactly-Once 的关键组件
KafkaSink 支持三种投递语义:
| 语义 | 是否丢数据 | 是否重复 |
|---|---|---|
| NONE | 可能 | 可能 |
| AT_LEAST_ONCE | 不丢 | 可能 |
| EXACTLY_ONCE | 不丢 | 不重复 |
Exactly-Once 的核心机制是:
- Kafka Transaction
- Checkpoint 对齐提交
⚠️ 注意事项:
- 必须开启 Checkpoint
transactionalIdPrefix必须全局唯一- Kafka 事务超时时间要大于 checkpoint + 重启时间
4. 监控、指标与安全
Kafka Connector 暴露了大量指标,包括:
- 消费延迟
- Watermark 滞后
- 未消费消息数
- Offset 提交情况
- Kafka 原生 Consumer / Producer Metrics
同时也支持:
- SASL / SSL
- Kerberos
- Rack Awareness(云环境降延迟)
五、总结
这篇文章可以概括为一句话:
Flink 的强大不只在算子,而在它如何优雅地连接整个数据世界。
- PyFlink 环境变量决定了你「跑不跑得起来」
- Hadoop Formats 决定了你「能不能吃老数据」
- DataGen 决定了你「调试是不是高效」
- Kafka Connector 决定了你「生产系统稳不稳」