苗栗县网站建设_网站建设公司_后端工程师_seo优化
2026/1/16 9:49:37 网站建设 项目流程

1、它到底做了什么

  • Source 并行运行:有多少个 source 并发子任务,就把Long的序列切成多少段(sub-sequence)
  • 你提供一个GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型
  • 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)

一句话:Flink 负责发 index,你负责把 index 变成事件。

2、最小可跑示例:生成 0~999 的字符串

importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}

要点:

  • 并行度为 1 时输出是严格"Number: 0""Number: 999"顺序
  • 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出

3、限速:控制总吞吐(全局每秒不超过 N 条)

importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);

适用场景:

  • 你想模拟“上游流量”但又不想把本机打爆
  • 做算子性能对比、Backpressure 观察、checkpoint 行为观察

4、有界/无界:它“永远是 bounded”,但可以“看起来无界”

  • 语义上永远是 bounded(理论上会结束)
  • numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)

建议:

  • 要跑有限数据:考虑 BATCH mode,更贴近离线回放
  • 要模拟持续输入:用Long.MAX_VALUE+ rate limit

5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?

可以,但有个硬条件:

  • GeneratorFunction必须对输入 index 完全确定性
    也就是:同一个 index 永远生成同样的输出。

反例(会破坏确定性):

  • random()System.currentTimeMillis()、读外部可变配置、读网络请求结果

正确做法:

  • 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
  • 或者用固定 seed 的伪随机:new Random(index)(每个 index 固定)

6、Watermark:也可以在 Source 侧发“确定性水位线”

默认例子用noWatermarks(),但你完全可以:

  • 在生成事件里带 eventTime
  • 配合自定义WatermarkStrategy生成 deterministic watermarks
    适合做 event-time 窗口、乱序、迟到数据的测试演示。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询